Class: Tamashii::Client::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/tamashii/client/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBase

Returns a new instance of Base.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/tamashii/client/base.rb', line 21

def initialize
  entry_point_with_slash = Tamashii::Client.config.entry_point.start_with?("/") ? Tamashii::Client.config.entry_point : "/#{Tamashii::Client.config.entry_point}"
  @url = "#{Tamashii::Client.config.use_ssl ? "wss" : "ws"}://#{Tamashii::Client.config.host}:#{Tamashii::Client.config.port}#{entry_point_with_slash}"

  @callbacks = {}

  @write_head = nil
  @write_buffer = Queue.new

  @nio = NIO::Selector.new
  @todo = Queue.new
  @stopping = false
  @closing = false
  @opened = false
  @thread = Thread.new {run}
end

Instance Attribute Details

#urlObject (readonly)

Returns the value of attribute url.



15
16
17
# File 'lib/tamashii/client/base.rb', line 15

def url
  @url
end

Instance Method Details

#abort_open_socket_taskObject



60
61
62
# File 'lib/tamashii/client/base.rb', line 60

def abort_open_socket_task
  @open_socket_task&.cancel
end

#call_callback(event, *args, &block) ⇒ Object



192
193
194
# File 'lib/tamashii/client/base.rb', line 192

def call_callback(event, *args, &block)
  @callbacks[event]&.call(*args, &block)
end

#closeObject



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/tamashii/client/base.rb', line 38

def close
  @closing = true
  if opened?
    close_driver
  else
    logger.info "Closing: server is not connected, close immediately"
    abort_open_socket_task
    stop
  end
  wait_for_worker_thread if worker_running?
end

#close_driverObject



50
51
52
# File 'lib/tamashii/client/base.rb', line 50

def close_driver
  post { @driver.close }
end

#closing?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/tamashii/client/base.rb', line 64

def closing?
  @closing
end

#flush_write_bufferObject



171
172
173
174
175
176
177
# File 'lib/tamashii/client/base.rb', line 171

def flush_write_buffer
  loop do
    return true if @write_buffer.empty? && @write_head.nil?
    @write_head = @write_buffer.pop if @write_head.nil?
    return false unless process_flush
  end
end

#kill_worker_threadObject



114
115
116
117
# File 'lib/tamashii/client/base.rb', line 114

def kill_worker_thread
  @thread.exit
  worker_cleanup(false)
end

#loggerObject



17
18
19
# File 'lib/tamashii/client/base.rb', line 17

def logger
  Client.logger
end

#on(event, callable = nil, &block) ⇒ Object



105
106
107
108
109
110
111
112
# File 'lib/tamashii/client/base.rb', line 105

def on(event, callable = nil, &block)
  logger.warn "Multiple callbacks detected, ignore the block" if callable && block
  if callable
	  @callbacks[event] = callable
  else
	  @callbacks[event] = block
  end
end

#open_socketObject



132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/tamashii/client/base.rb', line 132

def open_socket
  Timeout::timeout(Tamashii::Client.config.opening_timeout) do
    if Tamashii::Client.config.use_ssl
      OpenSSL::SSL::SSLSocket.new(TCPSocket.new(Tamashii::Client.config.host, Tamashii::Client.config.port)).connect
    else
      TCPSocket.new(Tamashii::Client.config.host, Tamashii::Client.config.port)
    end
  end
rescue Timeout::Error => e
  logger.error "Opening timeout after #{Tamashii::Client.config.opening_timeout} seconds"
  nil
rescue => e
  nil
end

#open_socket_asyncObject



163
164
165
166
167
168
169
# File 'lib/tamashii/client/base.rb', line 163

def open_socket_async
  if !closing? && !stopped?
    @open_socket_task = Concurrent::ScheduledTask.execute(Tamashii::Client.config.opening_retry_interval, &method(:open_socket_runner))
  else
    logger.warn "Client is closing, no longer need to create socket"
  end
end

#open_socket_runnerObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/tamashii/client/base.rb', line 147

def open_socket_runner
  logger.info "Trying to open the socket..."
  if @io = open_socket
    logger.info "Socket opened!"
    call_callback(:socket_opened)
    post do
      @monitor = @nio.register(@io, :r)
      @opened = true
      start_websocket_driver
    end
  else
    logger.error "Cannot open socket, retry later"
    open_socket_async
  end
end

#opened?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/tamashii/client/base.rb', line 68

def opened?
  @opened
end

#post(task = nil, &block) ⇒ Object



54
55
56
57
58
# File 'lib/tamashii/client/base.rb', line 54

def post(task = nil, &block)
  task ||= block
  @todo << block
  wakeup
end

#process_flushObject



179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/tamashii/client/base.rb', line 179

def process_flush
  written = @io.write_nonblock(@write_head, exception: false)
  case written
  when @write_head.bytesize
    @write_head = nil
    return true
  when :wait_writable then return false
  else
    @write_head = @write_head.byteslice(written, @write_head.bytesize)
    return false
  end
end

#readObject



250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/tamashii/client/base.rb', line 250

def read
  incoming = @io.read_nonblock(4096, exception: false)
  case incoming
  when :wait_readable then false
  when nil then server_gone
  else
    @driver.parse(incoming)
  end
rescue => e
  logger.error "Error when reading from server: #{e.message}"
  server_gone
end

#runObject



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/tamashii/client/base.rb', line 219

def run
  @worker_running = true
  logger.info "Worker Create!"
  open_socket_async
  loop do
    if stopped?
      @nio.close
      break
    end

    @todo.pop(true).call until @todo.empty?

    monitors = @nio.select
    next unless monitors
    monitors.each do |monitor|
      if monitor.writable?
        monitor.interests = :r if flush_write_buffer
      end
      if monitor.readable?
        read
      end
   end
  end
  worker_cleanup(true)
end

#server_goneObject



263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/tamashii/client/base.rb', line 263

def server_gone
  logger.info "Socket closed"
  @opened = false
  @io.close
  @nio.deregister @io
  call_callback(:socket_closed)
  if closing?
    # client should stop the thread
    stop
  else
    # closing is not issued by client, re-open
    open_socket_async
  end
end

#start_websocket_driverObject



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/tamashii/client/base.rb', line 196

def start_websocket_driver
  @driver = WebSocket::Driver.client(self)
  @driver.on :open, proc { |e|
    @opened = true
    logger.info "WebSocket Server opened"
    call_callback(:open)
  }
  @driver.on :close, proc { |e|
    logger.info "WebSocket Server closed"
    call_callback(:close)
    server_gone
  }
  @driver.on :message, proc { |e|
    logger.debug("Message from server: #{e.data}")
    call_callback(:message, e.data)
  }
  @driver.on :error, proc { |e|
    logger.error("WebSocket error: #{e.message}")
    call_callback(:error, e)
  }
  @driver.start
end

#stopObject

this is hard stop, will not issue a websocket close message!



279
280
281
282
# File 'lib/tamashii/client/base.rb', line 279

def stop
  @stopping = true
  wakeup
end

#stopped?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/tamashii/client/base.rb', line 72

def stopped?
  @stopping
end

#transmit(data) ⇒ Object

called from user



81
82
83
84
85
86
87
88
89
90
# File 'lib/tamashii/client/base.rb', line 81

def transmit(data)
  if opened?
    data = data.unpack("C*") if data.is_a?(String)
    post { @driver.binary(data) }
    true
  else
    logger.error "Server not opened. Cannot transmit data!"
    false
  end
end

#wait_for_worker_threadObject



119
120
121
122
123
124
# File 'lib/tamashii/client/base.rb', line 119

def wait_for_worker_thread
  if !@thread.join(Tamashii::Client.config.closing_timeout)
    logger.error "Unable to stop worker thread in #{Tamashii::Client.config.closing_timeout} second! Force kill the worker thread"
    kill_worker_thread
  end
end

#wakeupObject



126
127
128
129
130
# File 'lib/tamashii/client/base.rb', line 126

def wakeup
  @nio.wakeup
rescue
  logger.error "Select cannot be wakeup"
end

#worker_cleanup(normally) ⇒ Object



245
246
247
248
# File 'lib/tamashii/client/base.rb', line 245

def worker_cleanup(normally)
  @worker_running = false
  logger.debug "Worker terminales #{normally ? 'normally' : 'abnormally'}"
end

#worker_running?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/tamashii/client/base.rb', line 76

def worker_running?
  @worker_running
end

#write(data) ⇒ Object

called from ws driver



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/tamashii/client/base.rb', line 93

def write(data)
  @write_buffer << data
  post do
    begin
      @monitor&.interests = :rw
    rescue EOFError => e
      # Monitor is closed
      logger.error "Error when writing: #{e.message}"
    end
  end
end