Class: Tamashii::Client::Base
- Inherits:
-
Object
- Object
- Tamashii::Client::Base
- Defined in:
- lib/tamashii/client/base.rb
Instance Attribute Summary collapse
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
- #abort_open_socket_task ⇒ Object
- #call_callback(event, *args, &block) ⇒ Object
- #close ⇒ Object
- #close_driver ⇒ Object
- #closing? ⇒ Boolean
- #flush_write_buffer ⇒ Object
-
#initialize ⇒ Base
constructor
A new instance of Base.
- #kill_worker_thread ⇒ Object
- #logger ⇒ Object
- #on(event, callable = nil, &block) ⇒ Object
- #open_socket ⇒ Object
- #open_socket_async ⇒ Object
- #open_socket_runner ⇒ Object
- #opened? ⇒ Boolean
- #post(task = nil, &block) ⇒ Object
- #process_flush ⇒ Object
- #read ⇒ Object
- #run ⇒ Object
- #server_gone ⇒ Object
- #start_websocket_driver ⇒ Object
-
#stop ⇒ Object
this is hard stop, will not issue a websocket close message!.
- #stopped? ⇒ Boolean
-
#transmit(data) ⇒ Object
called from user.
- #wait_for_worker_thread ⇒ Object
- #wakeup ⇒ Object
- #worker_cleanup(normally) ⇒ Object
- #worker_running? ⇒ Boolean
-
#write(data) ⇒ Object
called from ws driver.
Constructor Details
#initialize ⇒ Base
Returns a new instance of Base.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/tamashii/client/base.rb', line 22 def initialize entry_point_with_slash = Config.entry_point.start_with?("/") ? Config.entry_point : "/#{Config.entry_point}" @url = "#{Config.use_ssl ? "wss" : "ws"}://#{Config.host}:#{Config.port}#{entry_point_with_slash}" @callbacks = {} @write_head = nil @write_buffer = Queue.new @nio = NIO::Selector.new @todo = Queue.new @stopping = false @closing = false @opened = false @thread = Thread.new {run} end |
Instance Attribute Details
#url ⇒ Object (readonly)
Returns the value of attribute url.
16 17 18 |
# File 'lib/tamashii/client/base.rb', line 16 def url @url end |
Instance Method Details
#abort_open_socket_task ⇒ Object
61 62 63 |
# File 'lib/tamashii/client/base.rb', line 61 def abort_open_socket_task @open_socket_task&.cancel end |
#call_callback(event, *args, &block) ⇒ Object
193 194 195 |
# File 'lib/tamashii/client/base.rb', line 193 def call_callback(event, *args, &block) @callbacks[event]&.call(*args, &block) end |
#close ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/tamashii/client/base.rb', line 39 def close @closing = true if opened? close_driver else logger.info "Closing: server is not connected, close immediately" abort_open_socket_task stop end wait_for_worker_thread if worker_running? end |
#close_driver ⇒ Object
51 52 53 |
# File 'lib/tamashii/client/base.rb', line 51 def close_driver post { @driver.close } end |
#closing? ⇒ Boolean
65 66 67 |
# File 'lib/tamashii/client/base.rb', line 65 def closing? @closing end |
#flush_write_buffer ⇒ Object
172 173 174 175 176 177 178 |
# File 'lib/tamashii/client/base.rb', line 172 def flush_write_buffer loop do return true if @write_buffer.empty? && @write_head.nil? @write_head = @write_buffer.pop if @write_head.nil? return false unless process_flush end end |
#kill_worker_thread ⇒ Object
115 116 117 118 |
# File 'lib/tamashii/client/base.rb', line 115 def kill_worker_thread @thread.exit worker_cleanup(false) end |
#logger ⇒ Object
18 19 20 |
# File 'lib/tamashii/client/base.rb', line 18 def logger Client.logger end |
#on(event, callable = nil, &block) ⇒ Object
106 107 108 109 110 111 112 113 |
# File 'lib/tamashii/client/base.rb', line 106 def on(event, callable = nil, &block) logger.warn "Multiple callbacks detected, ignore the block" if callable && block if callable @callbacks[event] = callable else @callbacks[event] = block end end |
#open_socket ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/tamashii/client/base.rb', line 133 def open_socket Timeout::timeout(Config.opening_timeout) do if Config.use_ssl OpenSSL::SSL::SSLSocket.new(TCPSocket.new(Config.host, Config.port)).connect else TCPSocket.new(Config.host, Config.port) end end rescue Timeout::Error => e logger.error "Opening timeout after #{Config.opening_timeout} seconds" nil rescue => e nil end |
#open_socket_async ⇒ Object
164 165 166 167 168 169 170 |
# File 'lib/tamashii/client/base.rb', line 164 def open_socket_async if !closing? && !stopped? @open_socket_task = Concurrent::ScheduledTask.execute(Config.opening_retry_interval, &method(:open_socket_runner)) else logger.warn "Client is closing, no longer need to create socket" end end |
#open_socket_runner ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/tamashii/client/base.rb', line 148 def open_socket_runner logger.info "Trying to open the socket..." if @io = open_socket logger.info "Socket opened!" call_callback(:socket_opened) post do @monitor = @nio.register(@io, :r) @opened = true start_websocket_driver end else logger.error "Cannot open socket, retry later" open_socket_async end end |
#opened? ⇒ Boolean
69 70 71 |
# File 'lib/tamashii/client/base.rb', line 69 def opened? @opened end |
#post(task = nil, &block) ⇒ Object
55 56 57 58 59 |
# File 'lib/tamashii/client/base.rb', line 55 def post(task = nil, &block) task ||= block @todo << block wakeup end |
#process_flush ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/tamashii/client/base.rb', line 180 def process_flush written = @io.write_nonblock(@write_head, exception: false) case written when @write_head.bytesize @write_head = nil return true when :wait_writable then return false else @write_head = @write_head.byteslice(written, @write_head.bytesize) return false end end |
#read ⇒ Object
251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/tamashii/client/base.rb', line 251 def read incoming = @io.read_nonblock(4096, exception: false) case incoming when :wait_readable then false when nil then server_gone else @driver.parse(incoming) end rescue server_gone end |
#run ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/tamashii/client/base.rb', line 220 def run @worker_running = true logger.info "Worker Create!" open_socket_async loop do if stopped? @nio.close break end @todo.pop(true).call until @todo.empty? monitors = @nio.select next unless monitors monitors.each do |monitor| if monitor.writable? monitor.interests = :r if flush_write_buffer end if monitor.readable? read end end end worker_cleanup(true) end |
#server_gone ⇒ Object
263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/tamashii/client/base.rb', line 263 def server_gone logger.info "Socket closed" @opened = false @io.close @nio.deregister @io call_callback(:socket_closed) if closing? # client should stop the thread stop else # closing is not issued by client, re-open open_socket_async end end |
#start_websocket_driver ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/tamashii/client/base.rb', line 197 def start_websocket_driver @driver = WebSocket::Driver.client(self) @driver.on :open, proc { |e| @opened = true logger.info "WebSocket Server opened" call_callback(:open) } @driver.on :close, proc { |e| logger.info "WebSocket Server closed" call_callback(:close) server_gone } @driver.on :message, proc { |e| logger.debug("Message from server: #{e.data}") call_callback(:message, e.data) } @driver.on :error, proc { |e| logger.error("WebSocket error: #{e.}") call_callback(:error, e) } @driver.start end |
#stop ⇒ Object
this is hard stop, will not issue a websocket close message!
279 280 281 282 |
# File 'lib/tamashii/client/base.rb', line 279 def stop @stopping = true wakeup end |
#stopped? ⇒ Boolean
73 74 75 |
# File 'lib/tamashii/client/base.rb', line 73 def stopped? @stopping end |
#transmit(data) ⇒ Object
called from user
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/tamashii/client/base.rb', line 82 def transmit(data) if opened? data = data.unpack("C*") if data.is_a?(String) post { @driver.binary(data) } true else logger.error "Server not opened. Cannot transmit data!" false end end |
#wait_for_worker_thread ⇒ Object
120 121 122 123 124 125 |
# File 'lib/tamashii/client/base.rb', line 120 def wait_for_worker_thread if !@thread.join(Config.closing_timeout) logger.error "Unable to stop worker thread in #{Config.closing_timeout} second! Force kill the worker thread" kill_worker_thread end end |
#wakeup ⇒ Object
127 128 129 130 131 |
# File 'lib/tamashii/client/base.rb', line 127 def wakeup @nio.wakeup rescue logger.error "Select cannot be wakeup" end |
#worker_cleanup(normally) ⇒ Object
246 247 248 249 |
# File 'lib/tamashii/client/base.rb', line 246 def worker_cleanup(normally) @worker_running = false logger.debug "Worker terminales #{normally ? 'normally' : 'abnormally'}" end |
#worker_running? ⇒ Boolean
77 78 79 |
# File 'lib/tamashii/client/base.rb', line 77 def worker_running? @worker_running end |
#write(data) ⇒ Object
called from ws driver
94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/tamashii/client/base.rb', line 94 def write(data) @write_buffer << data post do begin @monitor&.interests = :rw rescue EOFError => e # Monitor is closed logger.error "Error when writing: #{e.}" end end end |