Class: NATS::Client
- Inherits:
-
Object
- Object
- NATS::Client
- Includes:
- MonitorMixin, Status
- Defined in:
- lib/nats/io/client.rb
Overview
Client creates a connection to the NATS Server.
Constant Summary collapse
- DEFAULT_PORT =
4222
- DEFAULT_URI =
("nats://localhost:#{DEFAULT_PORT}".freeze)
- CR_LF =
("\r\n".freeze)
- CR_LF_SIZE =
(CR_LF.bytesize)
- PING_REQUEST =
("PING#{CR_LF}".freeze)
- PONG_RESPONSE =
("PONG#{CR_LF}".freeze)
- NATS_HDR_LINE =
("NATS/1.0#{CR_LF}".freeze)
- STATUS_MSG_LEN =
3
- STATUS_HDR =
("Status".freeze)
- DESC_HDR =
("Description".freeze)
- NATS_HDR_LINE_SIZE =
(NATS_HDR_LINE.bytesize)
- SUB_OP =
('SUB'.freeze)
- EMPTY_MSG =
(''.freeze)
Constants included from Status
Status::CLOSED, Status::CONNECTED, Status::CONNECTING, Status::DISCONNECTED, Status::DRAINING_PUBS, Status::DRAINING_SUBS, Status::RECONNECTING
Instance Attribute Summary collapse
-
#connected_server ⇒ Object
readonly
Returns the value of attribute connected_server.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#server_info ⇒ Object
readonly
Returns the value of attribute server_info.
-
#server_pool ⇒ Object
(also: #servers)
readonly
Returns the value of attribute server_pool.
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Instance Method Summary collapse
-
#close ⇒ Object
Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.
- #closed? ⇒ Boolean
-
#connect(uri = nil, opts = {}) ⇒ Object
Establishes a connection to NATS.
- #connected? ⇒ Boolean
- #connecting? ⇒ Boolean
-
#discovered_servers ⇒ Object
discovered_servers returns the NATS Servers that have been discovered via INFO protocol updates.
-
#drain ⇒ Object
drain will put a connection into a drain state.
- #draining? ⇒ Boolean
-
#flush(timeout = 10) ⇒ Object
Send a ping and wait for a pong back within a timeout.
-
#initialize ⇒ Client
constructor
A new instance of Client.
-
#jetstream(opts = {}) ⇒ NATS::JetStream
(also: #JetStream, #jsm)
Create a JetStream context.
- #last_error ⇒ Object
-
#new_inbox ⇒ String
new_inbox returns a unique inbox used for subscriptions.
-
#old_request(subject, payload, opts = {}, &blk) ⇒ Object
Sends a request creating an ephemeral subscription for the request, expecting a single response or raising a timeout in case the request is not retrieved within the specified deadline.
- #on_close(&callback) ⇒ Object
- #on_disconnect(&callback) ⇒ Object
- #on_error(&callback) ⇒ Object
- #on_reconnect(&callback) ⇒ Object
- #publish(subject, msg = EMPTY_MSG, opt_reply = nil, **options, &blk) ⇒ Object
-
#publish_msg(msg) ⇒ Object
Publishes a NATS::Msg that may include headers.
- #reconnecting? ⇒ Boolean
-
#request(subject, payload = "", **opts, &blk) ⇒ Object
Sends a request using expecting a single response using a single subscription per connection for receiving the responses.
-
#request_msg(msg, **opts) ⇒ Object
request_msg makes a NATS request using a NATS::Msg that may include headers.
-
#subscribe(subject, opts = {}, &callback) ⇒ Object
Create subscription which is dispatched asynchronously messages to a callback.
Constructor Details
#initialize ⇒ Client
Returns a new instance of Client.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/nats/io/client.rb', line 109 def initialize super # required to initialize monitor @options = nil # Read/Write IO @io = nil # Queues for coalescing writes of commands we need to send to server. @flush_queue = nil @pending_queue = nil # Parser with state @parser = NATS::Protocol::Parser.new(self) # Threads for both reading and flushing command @flusher_thread = nil @read_loop_thread = nil @ping_interval_thread = nil # Info that we get from the server @server_info = { } # URI from server to which we are currently connected @uri = nil @server_pool = [] @status = DISCONNECTED # Subscriptions @subs = { } @ssid = 0 # Ping interval @pings_outstanding = 0 @pongs_received = 0 @pongs = [] @pongs.extend(MonitorMixin) # Accounting @pending_size = 0 @stats = { in_msgs: 0, out_msgs: 0, in_bytes: 0, out_bytes: 0, reconnects: 0 } # Sticky error @last_err = nil # Async callbacks, no ops by default. @err_cb = proc { } @close_cb = proc { } @disconnect_cb = proc { } @reconnect_cb = proc { } # Secure TLS options @tls = nil # Hostname of current server; used for when TLS host # verification is enabled. @hostname = nil @single_url_connect_used = false # Track whether connect has been already been called. @connect_called = false # New style request/response implementation. @resp_sub = nil @resp_map = nil @resp_sub_prefix = nil @nuid = NATS::NUID.new # NKEYS @user_credentials = nil @nkeys_seed = nil @user_nkey_cb = nil @user_jwt_cb = nil @signature_cb = nil # Tokens @auth_token = nil @inbox_prefix = "_INBOX" # Draining @drain_t = nil end |
Instance Attribute Details
#connected_server ⇒ Object (readonly)
Returns the value of attribute connected_server.
89 90 91 |
# File 'lib/nats/io/client.rb', line 89 def connected_server @connected_server end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
89 90 91 |
# File 'lib/nats/io/client.rb', line 89 def @options end |
#server_info ⇒ Object (readonly)
Returns the value of attribute server_info.
89 90 91 |
# File 'lib/nats/io/client.rb', line 89 def server_info @server_info end |
#server_pool ⇒ Object (readonly) Also known as: servers
Returns the value of attribute server_pool.
89 90 91 |
# File 'lib/nats/io/client.rb', line 89 def server_pool @server_pool end |
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
89 90 91 |
# File 'lib/nats/io/client.rb', line 89 def stats @stats end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
89 90 91 |
# File 'lib/nats/io/client.rb', line 89 def status @status end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
89 90 91 |
# File 'lib/nats/io/client.rb', line 89 def uri @uri end |
Instance Method Details
#close ⇒ Object
Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.
698 699 700 |
# File 'lib/nats/io/client.rb', line 698 def close close_connection(CLOSED, true) end |
#closed? ⇒ Boolean
724 725 726 |
# File 'lib/nats/io/client.rb', line 724 def closed? @status == CLOSED end |
#connect(uri = nil, opts = {}) ⇒ Object
Establishes a connection to NATS.
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
# File 'lib/nats/io/client.rb', line 200 def connect(uri=nil, opts={}) synchronize do # In case it has been connected already, then do not need to call this again. return if @connect_called @connect_called = true end # Convert URI to string if needed. uri = uri.to_s if uri.is_a?(URI) case uri when String # Initialize TLS defaults in case any url is using it. srvs = opts[:servers] = process_uri(uri) if srvs.any? {|u| u.scheme == 'tls'} and !opts[:tls] tls_context = OpenSSL::SSL::SSLContext.new tls_context.set_params opts[:tls] = { context: tls_context } end @single_url_connect_used = true if srvs.size == 1 when Hash opts = uri end opts[:verbose] = false if opts[:verbose].nil? opts[:pedantic] = false if opts[:pedantic].nil? opts[:reconnect] = true if opts[:reconnect].nil? opts[:old_style_request] = false if opts[:old_style_request].nil? opts[:ignore_discovered_urls] = false if opts[:ignore_discovered_urls].nil? opts[:reconnect_time_wait] = NATS::IO::RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil? opts[:max_reconnect_attempts] = NATS::IO::MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil? opts[:ping_interval] = NATS::IO::DEFAULT_PING_INTERVAL if opts[:ping_interval].nil? opts[:max_outstanding_pings] = NATS::IO::DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil? # Override with ENV opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil? opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil? opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil? opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil? opts[:ignore_discovered_urls] = ENV['NATS_IGNORE_DISCOVERED_URLS'].downcase == 'true' unless ENV['NATS_IGNORE_DISCOVERED_URLS'].nil? opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil? opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil? opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil? opts[:connect_timeout] ||= NATS::IO::DEFAULT_CONNECT_TIMEOUT opts[:drain_timeout] ||= NATS::IO::DEFAULT_DRAIN_TIMEOUT @options = opts # Process servers in the NATS cluster and pick one to connect uris = opts[:servers] || [DEFAULT_URI] uris.shuffle! unless @options[:dont_randomize_servers] uris.each do |u| nats_uri = case u when URI u.dup else URI.parse(u) end @server_pool << { :uri => nats_uri, :hostname => nats_uri.hostname } end if @options[:old_style_request] # Replace for this instance the implementation # of request to use the old_request style. class << self; alias_method :request, :old_request; end end # NKEYS @signature_cb ||= opts[:user_signature_cb] @user_jwt_cb ||= opts[:user_jwt_cb] @user_nkey_cb ||= opts[:user_nkey_cb] @user_credentials ||= opts[:user_credentials] @nkeys_seed ||= opts[:nkeys_seed] setup_nkeys_connect if @user_credentials or @nkeys_seed # Tokens, if set will take preference over the user@server uri token @auth_token ||= opts[:auth_token] # Check for TLS usage @tls = @options[:tls] @inbox_prefix = opts.fetch(:custom_inbox_prefix, @inbox_prefix) validate_settings! srv = nil begin srv = select_next_server # Create TCP socket connection to NATS @io = create_socket @io.connect # Capture state that we have had a TCP connection established against # this server and could potentially be used for reconnecting. srv[:was_connected] = true # Connection established and now in process of sending CONNECT to NATS @status = CONNECTING # Use the hostname from the server for TLS hostname verification. if client_using_secure_connection? and single_url_connect_used? # Always reuse the original hostname used to connect. @hostname ||= srv[:hostname] else @hostname = srv[:hostname] end # Established TCP connection successfully so can start connect process_connect_init # Reset reconnection attempts if connection is valid srv[:reconnect_attempts] = 0 srv[:auth_required] ||= true if @server_info[:auth_required] # Add back to rotation since successfully connected server_pool << srv rescue NATS::IO::NoServersError => e @disconnect_cb.call(e) if @disconnect_cb raise @last_err || e rescue => e # Capture sticky error synchronize do @last_err = e srv[:auth_required] ||= true if @server_info[:auth_required] server_pool << srv if can_reuse_server?(srv) end err_cb_call(self, e, nil) if @err_cb if should_not_reconnect? @disconnect_cb.call(e) if @disconnect_cb raise e end # Clean up any connecting state and close connection without # triggering the disconnection/closed callbacks. close_connection(DISCONNECTED, false) # always sleep here to safe guard against errors before current[:was_connected] # is set for the first time sleep @options[:reconnect_time_wait] if @options[:reconnect_time_wait] # Continue retrying until there are no options left in the server pool retry end # Initialize queues and loops for message dispatching and processing engine @flush_queue = SizedQueue.new(NATS::IO::MAX_FLUSH_KICK_SIZE) @pending_queue = SizedQueue.new(NATS::IO::MAX_PENDING_SIZE) @pings_outstanding = 0 @pongs_received = 0 @pending_size = 0 # Server roundtrip went ok so consider to be connected at this point @status = CONNECTED # Connected to NATS so Ready to start parser loop, flusher and ping interval start_threads! self end |
#connected? ⇒ Boolean
712 713 714 |
# File 'lib/nats/io/client.rb', line 712 def connected? @status == CONNECTED end |
#connecting? ⇒ Boolean
716 717 718 |
# File 'lib/nats/io/client.rb', line 716 def connecting? @status == CONNECTING end |
#discovered_servers ⇒ Object
discovered_servers returns the NATS Servers that have been discovered via INFO protocol updates.
691 692 693 |
# File 'lib/nats/io/client.rb', line 691 def discovered_servers servers.select {|s| s[:discovered] } end |
#drain ⇒ Object
drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ‘on_close` callback option to know when the connection has moved from draining to closed.
768 769 770 771 772 773 774 |
# File 'lib/nats/io/client.rb', line 768 def drain return if draining? synchronize do @drain_t ||= Thread.new { do_drain } end end |
#draining? ⇒ Boolean
728 729 730 731 732 733 734 735 736 737 738 739 |
# File 'lib/nats/io/client.rb', line 728 def draining? if @status == DRAINING_PUBS or @status == DRAINING_SUBS return true end is_draining = false synchronize do is_draining = true if @drain_t end is_draining end |
#flush(timeout = 10) ⇒ Object
Send a ping and wait for a pong back within a timeout.
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 |
# File 'lib/nats/io/client.rb', line 671 def flush(timeout=10) # Schedule sending a PING, and block until we receive PONG back, # or raise a timeout in case the response is past the deadline. pong = @pongs.new_cond @pongs.synchronize do @pongs << pong # Flush once pong future has been prepared @pending_queue << PING_REQUEST @flush_queue << :ping MonotonicTime::with_nats_timeout(timeout) do pong.wait(timeout) end end end |
#jetstream(opts = {}) ⇒ NATS::JetStream Also known as: JetStream, jsm
Create a JetStream context.
782 783 784 |
# File 'lib/nats/io/client.rb', line 782 def jetstream(opts={}) ::NATS::JetStream.new(self, opts) end |
#last_error ⇒ Object
757 758 759 760 761 |
# File 'lib/nats/io/client.rb', line 757 def last_error synchronize do @last_err end end |
#new_inbox ⇒ String
new_inbox returns a unique inbox used for subscriptions.
704 705 706 |
# File 'lib/nats/io/client.rb', line 704 def new_inbox "#{@inbox_prefix}.#{@nuid.next}" end |
#old_request(subject, payload, opts = {}, &blk) ⇒ Object
Sends a request creating an ephemeral subscription for the request, expecting a single response or raising a timeout in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 |
# File 'lib/nats/io/client.rb', line 607 def old_request(subject, payload, opts={}, &blk) return unless subject inbox = new_inbox # If a callback was passed, then have it process # the messages asynchronously and return the sid. if blk opts[:max] ||= 1 s = subscribe(inbox, opts) do |msg| case blk.arity when 0 then blk.call when 1 then blk.call(msg) when 2 then blk.call(msg.data, msg.reply) when 3 then blk.call(msg.data, msg.reply, msg.subject) else blk.call(msg.data, msg.reply, msg.subject, msg.header) end end publish(subject, payload, inbox) return s end # In case block was not given, handle synchronously # with a timeout and only allow a single response. timeout = opts[:timeout] ||= 0.5 opts[:max] = 1 sub = Subscription.new sub.subject = inbox sub.received = 0 future = sub.new_cond sub.future = future sub.nc = self sid = nil synchronize do sid = (@ssid += 1) sub.sid = sid @subs[sid] = sub end send_command("SUB #{inbox} #{sid}#{CR_LF}") @flush_queue << :sub unsubscribe(sub, 1) sub.synchronize do # Publish the request and then wait for the response... publish(subject, payload, inbox) MonotonicTime::with_nats_timeout(timeout) do future.wait(timeout) end end response = sub.response if response and response.header status = response.header[STATUS_HDR] raise NATS::IO::NoRespondersError if status == "503" end response end |
#on_close(&callback) ⇒ Object
753 754 755 |
# File 'lib/nats/io/client.rb', line 753 def on_close(&callback) @close_cb = callback end |
#on_disconnect(&callback) ⇒ Object
745 746 747 |
# File 'lib/nats/io/client.rb', line 745 def on_disconnect(&callback) @disconnect_cb = callback end |
#on_error(&callback) ⇒ Object
741 742 743 |
# File 'lib/nats/io/client.rb', line 741 def on_error(&callback) @err_cb = callback end |
#on_reconnect(&callback) ⇒ Object
749 750 751 |
# File 'lib/nats/io/client.rb', line 749 def on_reconnect(&callback) @reconnect_cb = callback end |
#publish(subject, msg = EMPTY_MSG, opt_reply = nil, **options, &blk) ⇒ Object
368 369 370 371 372 373 374 375 376 377 378 379 380 381 |
# File 'lib/nats/io/client.rb', line 368 def publish(subject, msg=EMPTY_MSG, opt_reply=nil, **, &blk) raise NATS::IO::BadSubject if !subject or subject.empty? if [:header] return publish_msg(NATS::Msg.new(subject: subject, data: msg, reply: opt_reply, header: [:header])) end # Accounting msg_size = msg.bytesize @stats[:out_msgs] += 1 @stats[:out_bytes] += msg_size send_command("PUB #{subject} #{opt_reply} #{msg_size}\r\n#{msg}\r\n") @flush_queue << :pub if @flush_queue.empty? end |
#publish_msg(msg) ⇒ Object
Publishes a NATS::Msg that may include headers.
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/nats/io/client.rb', line 384 def publish_msg(msg) raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg) raise NATS::IO::BadSubject if !msg.subject or msg.subject.empty? msg.reply ||= '' msg.data ||= '' msg_size = msg.data.bytesize # Accounting @stats[:out_msgs] += 1 @stats[:out_bytes] += msg_size if msg.header hdr = '' hdr << NATS_HDR_LINE msg.header.each do |k, v| hdr << "#{k}: #{v}#{CR_LF}" end hdr << CR_LF hdr_len = hdr.bytesize total_size = msg_size + hdr_len send_command("HPUB #{msg.subject} #{msg.reply} #{hdr_len} #{total_size}\r\n#{hdr}#{msg.data}\r\n") else send_command("PUB #{msg.subject} #{msg.reply} #{msg_size}\r\n#{msg.data}\r\n") end @flush_queue << :pub if @flush_queue.empty? end |
#reconnecting? ⇒ Boolean
720 721 722 |
# File 'lib/nats/io/client.rb', line 720 def reconnecting? @status == RECONNECTING end |
#request(subject, payload = "", **opts, &blk) ⇒ Object
Sends a request using expecting a single response using a single subscription per connection for receiving the responses. It times out in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 |
# File 'lib/nats/io/client.rb', line 492 def request(subject, payload="", **opts, &blk) raise NATS::IO::BadSubject if !subject or subject.empty? # If a block was given then fallback to method using auto unsubscribe. return old_request(subject, payload, opts, &blk) if blk return old_request(subject, payload, opts) if opts[:old_style] if opts[:header] return request_msg(NATS::Msg.new(subject: subject, data: payload, header: opts[:header]), **opts) end token = nil inbox = nil future = nil response = nil timeout = opts[:timeout] ||= 0.5 synchronize do start_resp_mux_sub! unless @resp_sub_prefix # Create token for this request. token = @nuid.next inbox = "#{@resp_sub_prefix}.#{token}" # Create the a future for the request that will # get signaled when it receives the request. future = @resp_sub.new_cond @resp_map[token][:future] = future end # Publish request and wait for reply. publish(subject, payload, inbox) begin MonotonicTime::with_nats_timeout(timeout) do @resp_sub.synchronize do future.wait(timeout) end end rescue NATS::Timeout => e synchronize { @resp_map.delete(token) } raise e end # Check if there is a response already. synchronize do result = @resp_map[token] response = result[:response] @resp_map.delete(token) end if response and response.header status = response.header[STATUS_HDR] raise NATS::IO::NoRespondersError if status == "503" end response end |
#request_msg(msg, **opts) ⇒ Object
request_msg makes a NATS request using a NATS::Msg that may include headers.
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 |
# File 'lib/nats/io/client.rb', line 550 def request_msg(msg, **opts) raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg) raise NATS::IO::BadSubject if !msg.subject or msg.subject.empty? token = nil inbox = nil future = nil response = nil timeout = opts[:timeout] ||= 0.5 synchronize do start_resp_mux_sub! unless @resp_sub_prefix # Create token for this request. token = @nuid.next inbox = "#{@resp_sub_prefix}.#{token}" # Create the a future for the request that will # get signaled when it receives the request. future = @resp_sub.new_cond @resp_map[token][:future] = future end msg.reply = inbox msg.data ||= '' msg_size = msg.data.bytesize # Publish request and wait for reply. publish_msg(msg) begin MonotonicTime::with_nats_timeout(timeout) do @resp_sub.synchronize do future.wait(timeout) end end rescue NATS::Timeout => e synchronize { @resp_map.delete(token) } raise e end # Check if there is a response already. synchronize do result = @resp_map[token] response = result[:response] @resp_map.delete(token) end if response and response.header status = response.header[STATUS_HDR] raise NATS::IO::NoRespondersError if status == "503" end response end |
#subscribe(subject, opts = {}, &callback) ⇒ Object
Create subscription which is dispatched asynchronously messages to a callback.
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 |
# File 'lib/nats/io/client.rb', line 415 def subscribe(subject, opts={}, &callback) raise NATS::IO::ConnectionDrainingError.new("nats: connection draining") if draining? sid = nil sub = nil synchronize do sid = (@ssid += 1) sub = @subs[sid] = Subscription.new sub.nc = self sub.sid = sid end opts[:pending_msgs_limit] ||= NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT opts[:pending_bytes_limit] ||= NATS::IO::DEFAULT_SUB_PENDING_BYTES_LIMIT sub.subject = subject sub.callback = callback sub.received = 0 sub.queue = opts[:queue] if opts[:queue] sub.max = opts[:max] if opts[:max] sub.pending_msgs_limit = opts[:pending_msgs_limit] sub.pending_bytes_limit = opts[:pending_bytes_limit] sub.pending_queue = SizedQueue.new(sub.pending_msgs_limit) send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}") @flush_queue << :sub # Setup server support for auto-unsubscribe when receiving enough messages sub.unsubscribe(opts[:max]) if opts[:max] unless callback cond = sub.new_cond sub.wait_for_msgs_cond = cond end # Async subscriptions each own a single thread for the # delivery of messages. # FIXME: Support shared thread pool with configurable limits # to better support case of having a lot of subscriptions. sub.wait_for_msgs_t = Thread.new do loop do msg = sub.pending_queue.pop cb = nil sub.synchronize do # Decrease pending size since consumed already sub.pending_size -= msg.data.size cb = sub.callback end begin # Note: Keep some of the alternative arity versions to slightly # improve backwards compatibility. Eventually fine to deprecate # since recommended version would be arity of 1 to get a NATS::Msg. case cb.arity when 0 then cb.call when 1 then cb.call(msg) when 2 then cb.call(msg.data, msg.reply) when 3 then cb.call(msg.data, msg.reply, msg.subject) else cb.call(msg.data, msg.reply, msg.subject, msg.header) end rescue => e synchronize do err_cb_call(self, e, sub) if @err_cb end end end end if callback sub end |