Class: Tamashii::Client::Base
- Inherits:
-
Object
- Object
- Tamashii::Client::Base
- Defined in:
- lib/tamashii/client/base.rb
Instance Attribute Summary collapse
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
- #abort_open_socket_task ⇒ Object
- #call_callback(event, *args, &block) ⇒ Object
- #close ⇒ Object
- #close_driver ⇒ Object
- #closing? ⇒ Boolean
- #flush_write_buffer ⇒ Object
-
#initialize ⇒ Base
constructor
A new instance of Base.
- #kill_worker_thread ⇒ Object
- #logger ⇒ Object
- #on(event, callable = nil, &block) ⇒ Object
- #open_socket ⇒ Object
- #open_socket_async ⇒ Object
- #open_socket_runner ⇒ Object
- #opened? ⇒ Boolean
- #post(task = nil, &block) ⇒ Object
- #process_flush ⇒ Object
- #read ⇒ Object
- #run ⇒ Object
- #server_gone ⇒ Object
- #start_websocket_driver ⇒ Object
-
#stop ⇒ Object
this is hard stop, will not issue a websocket close message!.
- #stopped? ⇒ Boolean
-
#transmit(data) ⇒ Object
called from user.
- #wait_for_worker_thread ⇒ Object
- #wakeup ⇒ Object
- #worker_cleanup(normally) ⇒ Object
- #worker_running? ⇒ Boolean
-
#write(data) ⇒ Object
called from ws driver.
Constructor Details
#initialize ⇒ Base
Returns a new instance of Base.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/tamashii/client/base.rb', line 21 def initialize entry_point_with_slash = Tamashii::Client.config.entry_point.start_with?("/") ? Tamashii::Client.config.entry_point : "/#{Tamashii::Client.config.entry_point}" @url = "#{Tamashii::Client.config.use_ssl ? "wss" : "ws"}://#{Tamashii::Client.config.host}:#{Tamashii::Client.config.port}#{entry_point_with_slash}" @callbacks = {} @write_head = nil @write_buffer = Queue.new @nio = NIO::Selector.new @todo = Queue.new @stopping = false @closing = false @opened = false @thread = Thread.new {run} end |
Instance Attribute Details
#url ⇒ Object (readonly)
Returns the value of attribute url.
15 16 17 |
# File 'lib/tamashii/client/base.rb', line 15 def url @url end |
Instance Method Details
#abort_open_socket_task ⇒ Object
60 61 62 |
# File 'lib/tamashii/client/base.rb', line 60 def abort_open_socket_task @open_socket_task&.cancel end |
#call_callback(event, *args, &block) ⇒ Object
192 193 194 |
# File 'lib/tamashii/client/base.rb', line 192 def call_callback(event, *args, &block) @callbacks[event]&.call(*args, &block) end |
#close ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/tamashii/client/base.rb', line 38 def close @closing = true if opened? close_driver else logger.info "Closing: server is not connected, close immediately" abort_open_socket_task stop end wait_for_worker_thread if worker_running? end |
#close_driver ⇒ Object
50 51 52 |
# File 'lib/tamashii/client/base.rb', line 50 def close_driver post { @driver.close } end |
#closing? ⇒ Boolean
64 65 66 |
# File 'lib/tamashii/client/base.rb', line 64 def closing? @closing end |
#flush_write_buffer ⇒ Object
171 172 173 174 175 176 177 |
# File 'lib/tamashii/client/base.rb', line 171 def flush_write_buffer loop do return true if @write_buffer.empty? && @write_head.nil? @write_head = @write_buffer.pop if @write_head.nil? return false unless process_flush end end |
#kill_worker_thread ⇒ Object
114 115 116 117 |
# File 'lib/tamashii/client/base.rb', line 114 def kill_worker_thread @thread.exit worker_cleanup(false) end |
#logger ⇒ Object
17 18 19 |
# File 'lib/tamashii/client/base.rb', line 17 def logger Client.logger end |
#on(event, callable = nil, &block) ⇒ Object
105 106 107 108 109 110 111 112 |
# File 'lib/tamashii/client/base.rb', line 105 def on(event, callable = nil, &block) logger.warn "Multiple callbacks detected, ignore the block" if callable && block if callable @callbacks[event] = callable else @callbacks[event] = block end end |
#open_socket ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/tamashii/client/base.rb', line 132 def open_socket Timeout::timeout(Tamashii::Client.config.opening_timeout) do if Tamashii::Client.config.use_ssl OpenSSL::SSL::SSLSocket.new(TCPSocket.new(Tamashii::Client.config.host, Tamashii::Client.config.port)).connect else TCPSocket.new(Tamashii::Client.config.host, Tamashii::Client.config.port) end end rescue Timeout::Error => e logger.error "Opening timeout after #{Tamashii::Client.config.opening_timeout} seconds" nil rescue => e nil end |
#open_socket_async ⇒ Object
163 164 165 166 167 168 169 |
# File 'lib/tamashii/client/base.rb', line 163 def open_socket_async if !closing? && !stopped? @open_socket_task = Concurrent::ScheduledTask.execute(Tamashii::Client.config.opening_retry_interval, &method(:open_socket_runner)) else logger.warn "Client is closing, no longer need to create socket" end end |
#open_socket_runner ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/tamashii/client/base.rb', line 147 def open_socket_runner logger.info "Trying to open the socket..." if @io = open_socket logger.info "Socket opened!" call_callback(:socket_opened) post do @monitor = @nio.register(@io, :r) @opened = true start_websocket_driver end else logger.error "Cannot open socket, retry later" open_socket_async end end |
#opened? ⇒ Boolean
68 69 70 |
# File 'lib/tamashii/client/base.rb', line 68 def opened? @opened end |
#post(task = nil, &block) ⇒ Object
54 55 56 57 58 |
# File 'lib/tamashii/client/base.rb', line 54 def post(task = nil, &block) task ||= block @todo << block wakeup end |
#process_flush ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/tamashii/client/base.rb', line 179 def process_flush written = @io.write_nonblock(@write_head, exception: false) case written when @write_head.bytesize @write_head = nil return true when :wait_writable then return false else @write_head = @write_head.byteslice(written, @write_head.bytesize) return false end end |
#read ⇒ Object
250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/tamashii/client/base.rb', line 250 def read incoming = @io.read_nonblock(4096, exception: false) case incoming when :wait_readable then false when nil then server_gone else @driver.parse(incoming) end rescue => e logger.error "Error when reading from server: #{e.}" server_gone end |
#run ⇒ Object
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/tamashii/client/base.rb', line 219 def run @worker_running = true logger.info "Worker Create!" open_socket_async loop do if stopped? @nio.close break end @todo.pop(true).call until @todo.empty? monitors = @nio.select next unless monitors monitors.each do |monitor| if monitor.writable? monitor.interests = :r if flush_write_buffer end if monitor.readable? read end end end worker_cleanup(true) end |
#server_gone ⇒ Object
263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/tamashii/client/base.rb', line 263 def server_gone logger.info "Socket closed" @opened = false @io.close @nio.deregister @io call_callback(:socket_closed) if closing? # client should stop the thread stop else # closing is not issued by client, re-open open_socket_async end end |
#start_websocket_driver ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/tamashii/client/base.rb', line 196 def start_websocket_driver @driver = WebSocket::Driver.client(self) @driver.on :open, proc { |e| @opened = true logger.info "WebSocket Server opened" call_callback(:open) } @driver.on :close, proc { |e| logger.info "WebSocket Server closed" call_callback(:close) server_gone } @driver.on :message, proc { |e| logger.debug("Message from server: #{e.data}") call_callback(:message, e.data) } @driver.on :error, proc { |e| logger.error("WebSocket error: #{e.}") call_callback(:error, e) } @driver.start end |
#stop ⇒ Object
this is hard stop, will not issue a websocket close message!
279 280 281 282 |
# File 'lib/tamashii/client/base.rb', line 279 def stop @stopping = true wakeup end |
#stopped? ⇒ Boolean
72 73 74 |
# File 'lib/tamashii/client/base.rb', line 72 def stopped? @stopping end |
#transmit(data) ⇒ Object
called from user
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/tamashii/client/base.rb', line 81 def transmit(data) if opened? data = data.unpack("C*") if data.is_a?(String) post { @driver.binary(data) } true else logger.error "Server not opened. Cannot transmit data!" false end end |
#wait_for_worker_thread ⇒ Object
119 120 121 122 123 124 |
# File 'lib/tamashii/client/base.rb', line 119 def wait_for_worker_thread if !@thread.join(Tamashii::Client.config.closing_timeout) logger.error "Unable to stop worker thread in #{Tamashii::Client.config.closing_timeout} second! Force kill the worker thread" kill_worker_thread end end |
#wakeup ⇒ Object
126 127 128 129 130 |
# File 'lib/tamashii/client/base.rb', line 126 def wakeup @nio.wakeup rescue logger.error "Select cannot be wakeup" end |
#worker_cleanup(normally) ⇒ Object
245 246 247 248 |
# File 'lib/tamashii/client/base.rb', line 245 def worker_cleanup(normally) @worker_running = false logger.debug "Worker terminales #{normally ? 'normally' : 'abnormally'}" end |
#worker_running? ⇒ Boolean
76 77 78 |
# File 'lib/tamashii/client/base.rb', line 76 def worker_running? @worker_running end |
#write(data) ⇒ Object
called from ws driver
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/tamashii/client/base.rb', line 93 def write(data) @write_buffer << data post do begin @monitor&.interests = :rw rescue EOFError => e # Monitor is closed logger.error "Error when writing: #{e.}" end end end |