Class: NATS::IO::Client
- Inherits:
-
Object
- Object
- NATS::IO::Client
- Includes:
- MonitorMixin
- Defined in:
- lib/nats/io/client.rb
Instance Attribute Summary collapse
-
#connected_server ⇒ Object
readonly
Returns the value of attribute connected_server.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#server_info ⇒ Object
readonly
Returns the value of attribute server_info.
-
#server_pool ⇒ Object
(also: #servers)
readonly
Returns the value of attribute server_pool.
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Instance Method Summary collapse
-
#close ⇒ Object
Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.
- #closed? ⇒ Boolean
-
#connect(opts = {}) ⇒ Object
Establishes connection to NATS.
- #connected? ⇒ Boolean
- #connecting? ⇒ Boolean
- #discovered_servers ⇒ Object
-
#flush(timeout = 60) ⇒ Object
Send a ping and wait for a pong back within a timeout.
-
#initialize ⇒ Client
constructor
A new instance of Client.
- #last_error ⇒ Object
- #new_inbox ⇒ Object
- #on_close(&callback) ⇒ Object
- #on_disconnect(&callback) ⇒ Object
- #on_error(&callback) ⇒ Object
- #on_reconnect(&callback) ⇒ Object
-
#process_err(err) ⇒ Object
Handles protocol errors being sent by the server.
- #process_msg(subject, sid, reply, data) ⇒ Object
-
#process_ping ⇒ Object
Received a ping so respond back with a pong.
-
#process_pong ⇒ Object
Methods only used by the parser.
- #publish(subject, msg = EMPTY_MSG, opt_reply = nil, &blk) ⇒ Object
- #reconnecting? ⇒ Boolean
-
#request(subject, payload, opts = {}, &blk) ⇒ Object
Sends a request expecting a single response or raises a timeout in case the request is not retrieved within the specified deadline.
-
#subscribe(subject, opts = {}, &callback) ⇒ Object
Create subscription which is dispatched asynchronously messages to a callback.
-
#unsubscribe(sid, opt_max = nil) ⇒ Object
Auto unsubscribes the server by sending UNSUB command and throws away subscription in case already present and has received enough messages.
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/nats/io/client.rb', line 86 def initialize super # required to initialize monitor @options = nil # Read/Write IO @io = nil # Queues for coalescing writes of commands we need to send to server. @flush_queue = nil @pending_queue = nil # Parser with state @parser = NATS::Protocol::Parser.new(self) # Threads for both reading and flushing command @flusher_thread = nil @read_loop_thread = nil @ping_interval_thread = nil # Info that we get from the server @server_info = { } # URI from server to which we are currently connected @uri = nil @server_pool = [] @status = DISCONNECTED # Subscriptions @subs = { } @ssid = 0 # Ping interval @pings_outstanding = 0 @pongs_received = 0 @pongs = [] @pongs.extend(MonitorMixin) # Accounting @pending_size = 0 @stats = { in_msgs: 0, out_msgs: 0, in_bytes: 0, out_bytes: 0, reconnects: 0 } # Sticky error @last_err = nil # Async callbacks @err_cb = proc { |e| raise e } @close_cb = proc { } @disconnect_cb = proc { } @reconnect_cb = proc { } # Secure TLS options @tls = nil end |
Instance Attribute Details
#connected_server ⇒ Object (readonly)
Returns the value of attribute connected_server.
84 85 86 |
# File 'lib/nats/io/client.rb', line 84 def connected_server @connected_server end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
84 85 86 |
# File 'lib/nats/io/client.rb', line 84 def @options end |
#server_info ⇒ Object (readonly)
Returns the value of attribute server_info.
84 85 86 |
# File 'lib/nats/io/client.rb', line 84 def server_info @server_info end |
#server_pool ⇒ Object (readonly) Also known as: servers
Returns the value of attribute server_pool.
84 85 86 |
# File 'lib/nats/io/client.rb', line 84 def server_pool @server_pool end |
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
84 85 86 |
# File 'lib/nats/io/client.rb', line 84 def stats @stats end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
84 85 86 |
# File 'lib/nats/io/client.rb', line 84 def status @status end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
84 85 86 |
# File 'lib/nats/io/client.rb', line 84 def uri @uri end |
Instance Method Details
#close ⇒ Object
Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'lib/nats/io/client.rb', line 453 def close synchronize do return if @status == CLOSED @status = CLOSED end # Kick the flusher so it bails due to closed state @flush_queue << :fallout Thread.pass # FIXME: More graceful way of handling the following? # Ensure ping interval and flusher are not running anymore @ping_interval_thread.exit if @ping_interval_thread.alive? @flusher_thread.exit if @flusher_thread.alive? @read_loop_thread.exit if @read_loop_thread.alive? # TODO: Delete any other state which we are not using here too. synchronize do @pongs.synchronize do @pongs.each do |pong| pong.signal end @pongs.clear end # Try to write any pending flushes in case # we have a connection then close it. begin cmds = [] cmds << @pending_queue.pop until @pending_queue.empty? # FIXME: Fails when empty on TLS connection? @io.write(cmds.join) unless cmds.empty? rescue => e @last_err = e @err_cb.call(e) if @err_cb end if @io and not @io.closed? # TODO: Destroy any remaining subscriptions @disconnect_cb.call if @disconnect_cb @close_cb.call if @close_cb # Close the established connection in case # we still have it. if @io @io.close @io = nil end end end |
#closed? ⇒ Boolean
524 525 526 |
# File 'lib/nats/io/client.rb', line 524 def closed? @status == CLOSED end |
#connect(opts = {}) ⇒ Object
Establishes connection to NATS
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/nats/io/client.rb', line 148 def connect(opts={}) opts[:verbose] = false if opts[:verbose].nil? opts[:pedantic] = false if opts[:pedantic].nil? opts[:reconnect] = true if opts[:reconnect].nil? opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil? opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil? opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil? opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil? # Override with ENV opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil? opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil? opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil? opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil? opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil? opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil? opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil? @options = opts # Process servers in the NATS cluster and pick one to connect uris = opts[:servers] || [DEFAULT_URI] uris.shuffle! unless @options[:dont_randomize_servers] uris.each do |u| @server_pool << { :uri => u.is_a?(URI) ? u.dup : URI.parse(u) } end # Check for TLS usage @tls = @options[:tls] begin current = select_next_server # Create TCP socket connection to NATS @io = create_socket @io.connect # Capture state that we have had a TCP connection established against # this server and could potentially be used for reconnecting. current[:was_connected] = true # Connection established and now in process of sending CONNECT to NATS @status = CONNECTING # Established TCP connection successfully so can start connect process_connect_init # Reset reconnection attempts if connection is valid current[:reconnect_attempts] = 0 rescue NoServersError => e @disconnect_cb.call(e) if @disconnect_cb raise e rescue => e # Capture sticky error synchronize { @last_err = e } @err_cb.call(e) if @err_cb if should_not_reconnect? @disconnect_cb.call(e) if @disconnect_cb raise e end # always sleep here to safe guard against errors before current[:was_connected] # is set for the first time sleep @options[:reconnect_time_wait] if @options[:reconnect_time_wait] # Continue retrying until there are no options left in the server pool retry end # Initialize queues and loops for message dispatching and processing engine @flush_queue = SizedQueue.new(MAX_FLUSH_KICK_SIZE) @pending_queue = SizedQueue.new(MAX_PENDING_SIZE) @pings_outstanding = 0 @pongs_received = 0 @pending_size = 0 # Server roundtrip went ok so consider to be connected at this point @status = CONNECTED # Connected to NATS so Ready to start parser loop, flusher and ping interval start_threads! end |
#connected? ⇒ Boolean
512 513 514 |
# File 'lib/nats/io/client.rb', line 512 def connected? @status == CONNECTED end |
#connecting? ⇒ Boolean
516 517 518 |
# File 'lib/nats/io/client.rb', line 516 def connecting? @status == CONNECTING end |
#discovered_servers ⇒ Object
354 355 356 |
# File 'lib/nats/io/client.rb', line 354 def discovered_servers servers.select {|s| s[:discovered] } end |
#flush(timeout = 60) ⇒ Object
Send a ping and wait for a pong back within a timeout.
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 |
# File 'lib/nats/io/client.rb', line 336 def flush(timeout=60) # Schedule sending a PING, and block until we receive PONG back, # or raise a timeout in case the response is past the deadline. pong = @pongs.new_cond @pongs.synchronize do @pongs << pong # Flush once pong future has been prepared @pending_queue << PING_REQUEST @flush_queue << :ping with_nats_timeout(timeout) do pong.wait(timeout) end end end |
#last_error ⇒ Object
544 545 546 547 548 |
# File 'lib/nats/io/client.rb', line 544 def last_error synchronize do @last_err end end |
#new_inbox ⇒ Object
504 505 506 |
# File 'lib/nats/io/client.rb', line 504 def new_inbox "_INBOX.#{SecureRandom.hex(13)}" end |
#on_close(&callback) ⇒ Object
540 541 542 |
# File 'lib/nats/io/client.rb', line 540 def on_close(&callback) @close_cb = callback end |
#on_disconnect(&callback) ⇒ Object
532 533 534 |
# File 'lib/nats/io/client.rb', line 532 def on_disconnect(&callback) @disconnect_cb = callback end |
#on_error(&callback) ⇒ Object
528 529 530 |
# File 'lib/nats/io/client.rb', line 528 def on_error(&callback) @err_cb = callback end |
#on_reconnect(&callback) ⇒ Object
536 537 538 |
# File 'lib/nats/io/client.rb', line 536 def on_reconnect(&callback) @reconnect_cb = callback end |
#process_err(err) ⇒ Object
Handles protocol errors being sent by the server.
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/nats/io/client.rb', line 379 def process_err(err) # FIXME: In case of a stale connection, then handle as process_op_error # In case of permissions violation then dispatch the error callback # while holding the lock. current = server_pool.first current[:error_received] = true if current[:auth_required] @err_cb.call(NATS::IO::AuthError.new(err)) else @err_cb.call(NATS::IO::ServerError.new(err)) end # Otherwise, capture the error under a lock and close # the connection gracefully. synchronize do @last_err = NATS::IO::ServerError.new(err) end close end |
#process_msg(subject, sid, reply, data) ⇒ Object
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 |
# File 'lib/nats/io/client.rb', line 401 def process_msg(subject, sid, reply, data) # Accounting @stats[:in_msgs] += 1 @stats[:in_bytes] += data.size # Throw away in case we no longer manage the subscription sub = nil synchronize { sub = @subs[sid] } return unless sub # Check for auto_unsubscribe sub.synchronize do sub.received += 1 if sub.max case when sub.received > sub.max # Client side support in case server did not receive unsubscribe unsubscribe(sid) return when sub.received == sub.max # Cleanup here if we have hit the max.. @subs.delete(sid) end end # In case of a request which requires a future # do so here already while holding the lock and return if sub.future future = sub.future sub.response = Msg.new(subject, reply, data) future.signal return end end # Distinguish between async subscriptions with callbacks # and request subscriptions which expect a single response. if sub.callback cb = sub.callback case cb.arity when 0 then cb.call when 1 then cb.call(data) when 2 then cb.call(data, reply) else cb.call(data, reply, subject) end end end |
#process_ping ⇒ Object
Received a ping so respond back with a pong
371 372 373 374 375 376 |
# File 'lib/nats/io/client.rb', line 371 def process_ping @pending_queue << PONG_RESPONSE @flush_queue << :ping pong = @pongs.new_cond @pongs.synchronize { @pongs << pong } end |
#process_pong ⇒ Object
Methods only used by the parser
360 361 362 363 364 365 366 367 368 |
# File 'lib/nats/io/client.rb', line 360 def process_pong # Take first pong wait and signal any flush in case there was one @pongs.synchronize do pong = @pongs.pop pong.signal unless pong.nil? end @pings_outstanding -= 1 @pongs_received += 1 end |
#publish(subject, msg = EMPTY_MSG, opt_reply = nil, &blk) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/nats/io/client.rb', line 232 def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk) raise BadSubject if !subject or subject.empty? msg_size = msg.bytesize # Accounting @stats[:out_msgs] += 1 @stats[:out_bytes] += msg_size send_command("PUB #{subject} #{opt_reply} #{msg_size}\r\n#{msg}\r\n") @flush_queue << :pub if @flush_queue.empty? end |
#reconnecting? ⇒ Boolean
520 521 522 |
# File 'lib/nats/io/client.rb', line 520 def reconnecting? @status == RECONNECTING end |
#request(subject, payload, opts = {}, &blk) ⇒ Object
Sends a request expecting a single response or raises a timeout in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/nats/io/client.rb', line 267 def request(subject, payload, opts={}, &blk) return unless subject inbox = new_inbox # If a callback was passed, then have it process # the messages asynchronously and return the sid. if blk opts[:max] ||= 1 s = subscribe(inbox, opts) do |msg, reply| case blk.arity when 0 then blk.call when 1 then blk.call(msg) else blk.call(msg, reply) end end publish(subject, payload, inbox) return s end # In case block was not given, handle synchronously # with a timeout and only allow a single response. timeout = opts[:timeout] ||= 0.5 opts[:max] = 1 sub = Subscription.new sub.subject = inbox sub.received = 0 future = sub.new_cond sub.future = future sid = nil synchronize do sid = (@ssid += 1) @subs[sid] = sub end send_command("SUB #{inbox} #{sid}#{CR_LF}") @flush_queue << :sub unsubscribe(sid, 1) sub.synchronize do # Publish the request and then wait for the response... publish(subject, payload, inbox) with_nats_timeout(timeout) do future.wait(timeout) end end response = sub.response response end |
#subscribe(subject, opts = {}, &callback) ⇒ Object
Create subscription which is dispatched asynchronously messages to a callback.
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/nats/io/client.rb', line 246 def subscribe(subject, opts={}, &callback) sid = (@ssid += 1) sub = @subs[sid] = Subscription.new sub.subject = subject sub.callback = callback sub.received = 0 sub.queue = opts[:queue] if opts[:queue] sub.max = opts[:max] if opts[:max] send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}") @flush_queue << :sub # Setup server support for auto-unsubscribe when receiving enough messages unsubscribe(sid, opts[:max]) if opts[:max] sid end |
#unsubscribe(sid, opt_max = nil) ⇒ Object
Auto unsubscribes the server by sending UNSUB command and throws away subscription in case already present and has received enough messages.
323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/nats/io/client.rb', line 323 def unsubscribe(sid, opt_max=nil) opt_max_str = " #{opt_max}" unless opt_max.nil? send_command("UNSUB #{sid}#{opt_max_str}#{CR_LF}") @flush_queue << :unsub return unless sub = @subs[sid] synchronize do sub.max = opt_max @subs.delete(sid) unless (sub.max && (sub.received < sub.max)) end end |