Class: NATS::Client

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, Status
Defined in:
lib/nats/io/client.rb

Overview

Client creates a connection to the NATS Server.

Constant Summary collapse

DEFAULT_PORT =
4222
DEFAULT_URI =
("nats://localhost:#{DEFAULT_PORT}".freeze)
CR_LF =
("\r\n".freeze)
CR_LF_SIZE =
(CR_LF.bytesize)
PING_REQUEST =
("PING#{CR_LF}".freeze)
PONG_RESPONSE =
("PONG#{CR_LF}".freeze)
NATS_HDR_LINE =
("NATS/1.0#{CR_LF}".freeze)
STATUS_MSG_LEN =
3
STATUS_HDR =
("Status".freeze)
DESC_HDR =
("Description".freeze)
NATS_HDR_LINE_SIZE =
(NATS_HDR_LINE.bytesize)
SUB_OP =
('SUB'.freeze)
EMPTY_MSG =
(''.freeze)

Constants included from Status

Status::CLOSED, Status::CONNECTED, Status::CONNECTING, Status::DISCONNECTED, Status::DRAINING_PUBS, Status::DRAINING_SUBS, Status::RECONNECTING

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/nats/io/client.rb', line 109

def initialize
  super # required to initialize monitor
  @options = nil

  # Read/Write IO
  @io = nil

  # Queues for coalescing writes of commands we need to send to server.
  @flush_queue = nil
  @pending_queue = nil

  # Parser with state
  @parser = NATS::Protocol::Parser.new(self)

  # Threads for both reading and flushing command
  @flusher_thread = nil
  @read_loop_thread = nil
  @ping_interval_thread = nil

  # Info that we get from the server
  @server_info = { }

  # URI from server to which we are currently connected
  @uri = nil
  @server_pool = []

  @status = DISCONNECTED

  # Subscriptions
  @subs = { }
  @ssid = 0

  # Ping interval
  @pings_outstanding = 0
  @pongs_received = 0
  @pongs = []
  @pongs.extend(MonitorMixin)

  # Accounting
  @pending_size = 0
  @stats = {
    in_msgs: 0,
    out_msgs: 0,
    in_bytes: 0,
    out_bytes: 0,
    reconnects: 0
  }

  # Sticky error
  @last_err = nil

  # Async callbacks, no ops by default.
  @err_cb = proc { }
  @close_cb = proc { }
  @disconnect_cb = proc { }
  @reconnect_cb = proc { }

  # Secure TLS options
  @tls = nil

  # Hostname of current server; used for when TLS host
  # verification is enabled.
  @hostname = nil
  @single_url_connect_used = false

  # Track whether connect has been already been called.
  @connect_called = false

  # New style request/response implementation.
  @resp_sub = nil
  @resp_map = nil
  @resp_sub_prefix = nil
  @nuid = NATS::NUID.new

  # NKEYS
  @user_credentials = nil
  @nkeys_seed = nil
  @user_nkey_cb = nil
  @user_jwt_cb = nil
  @signature_cb = nil

  # Tokens
  @auth_token = nil

  @inbox_prefix = "_INBOX"

  # Draining
  @drain_t = nil
end

Instance Attribute Details

#connected_serverObject (readonly)

Returns the value of attribute connected_server.



89
90
91
# File 'lib/nats/io/client.rb', line 89

def connected_server
  @connected_server
end

#optionsObject (readonly)

Returns the value of attribute options.



89
90
91
# File 'lib/nats/io/client.rb', line 89

def options
  @options
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



89
90
91
# File 'lib/nats/io/client.rb', line 89

def server_info
  @server_info
end

#server_poolObject (readonly) Also known as: servers

Returns the value of attribute server_pool.



89
90
91
# File 'lib/nats/io/client.rb', line 89

def server_pool
  @server_pool
end

#statsObject (readonly)

Returns the value of attribute stats.



89
90
91
# File 'lib/nats/io/client.rb', line 89

def stats
  @stats
end

#statusObject (readonly)

Returns the value of attribute status.



89
90
91
# File 'lib/nats/io/client.rb', line 89

def status
  @status
end

#uriObject (readonly)

Returns the value of attribute uri.



89
90
91
# File 'lib/nats/io/client.rb', line 89

def uri
  @uri
end

Instance Method Details

#closeObject

Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.



698
699
700
# File 'lib/nats/io/client.rb', line 698

def close
  close_connection(CLOSED, true)
end

#closed?Boolean

Returns:

  • (Boolean)


724
725
726
# File 'lib/nats/io/client.rb', line 724

def closed?
  @status == CLOSED
end

#connect(uri = nil, opts = {}) ⇒ Object

Establishes a connection to NATS.



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# File 'lib/nats/io/client.rb', line 200

def connect(uri=nil, opts={})
  synchronize do
    # In case it has been connected already, then do not need to call this again.
    return if @connect_called
    @connect_called = true
  end

  # Convert URI to string if needed.
  uri = uri.to_s if uri.is_a?(URI)

  case uri
  when String
    # Initialize TLS defaults in case any url is using it.
    srvs = opts[:servers] = process_uri(uri)
    if srvs.any? {|u| u.scheme == 'tls'} and !opts[:tls]
      tls_context = OpenSSL::SSL::SSLContext.new
      tls_context.set_params
      opts[:tls] = {
        context: tls_context
      }
    end
    @single_url_connect_used = true if srvs.size == 1
  when Hash
    opts = uri
  end

  opts[:verbose] = false if opts[:verbose].nil?
  opts[:pedantic] = false if opts[:pedantic].nil?
  opts[:reconnect] = true if opts[:reconnect].nil?
  opts[:old_style_request] = false if opts[:old_style_request].nil?
  opts[:ignore_discovered_urls] = false if opts[:ignore_discovered_urls].nil?
  opts[:reconnect_time_wait] = NATS::IO::RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil?
  opts[:max_reconnect_attempts] = NATS::IO::MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil?
  opts[:ping_interval] = NATS::IO::DEFAULT_PING_INTERVAL if opts[:ping_interval].nil?
  opts[:max_outstanding_pings] = NATS::IO::DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil?

  # Override with ENV
  opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil?
  opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil?
  opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil?
  opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil?
  opts[:ignore_discovered_urls] = ENV['NATS_IGNORE_DISCOVERED_URLS'].downcase == 'true' unless ENV['NATS_IGNORE_DISCOVERED_URLS'].nil?
  opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil?
  opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil?
  opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil?
  opts[:connect_timeout] ||= NATS::IO::DEFAULT_CONNECT_TIMEOUT
  opts[:drain_timeout] ||= NATS::IO::DEFAULT_DRAIN_TIMEOUT
  @options = opts

  # Process servers in the NATS cluster and pick one to connect
  uris = opts[:servers] || [DEFAULT_URI]
  uris.shuffle! unless @options[:dont_randomize_servers]
  uris.each do |u|
    nats_uri = case u
               when URI
                 u.dup
               else
                 URI.parse(u)
               end
    @server_pool << {
      :uri => nats_uri,
      :hostname => nats_uri.hostname
    }
  end

  if @options[:old_style_request]
    # Replace for this instance the implementation
    # of request to use the old_request style.
    class << self; alias_method :request, :old_request; end
  end

  # NKEYS
  @signature_cb ||= opts[:user_signature_cb]
  @user_jwt_cb ||= opts[:user_jwt_cb]
  @user_nkey_cb ||= opts[:user_nkey_cb]
  @user_credentials ||= opts[:user_credentials]
  @nkeys_seed ||= opts[:nkeys_seed]

  setup_nkeys_connect if @user_credentials or @nkeys_seed

  # Tokens, if set will take preference over the user@server uri token
  @auth_token ||= opts[:auth_token]

  # Check for TLS usage
  @tls = @options[:tls]

  @inbox_prefix = opts.fetch(:custom_inbox_prefix, @inbox_prefix)

  validate_settings!

  srv = nil
  begin
    srv = select_next_server

    # Create TCP socket connection to NATS
    @io = create_socket
    @io.connect

    # Capture state that we have had a TCP connection established against
    # this server and could potentially be used for reconnecting.
    srv[:was_connected] = true

    # Connection established and now in process of sending CONNECT to NATS
    @status = CONNECTING

    # Use the hostname from the server for TLS hostname verification.
    if client_using_secure_connection? and single_url_connect_used?
      # Always reuse the original hostname used to connect.
      @hostname ||= srv[:hostname]
    else
      @hostname = srv[:hostname]
    end

    # Established TCP connection successfully so can start connect
    process_connect_init

    # Reset reconnection attempts if connection is valid
    srv[:reconnect_attempts] = 0
    srv[:auth_required] ||= true if @server_info[:auth_required]

    # Add back to rotation since successfully connected
    server_pool << srv
  rescue NATS::IO::NoServersError => e
    @disconnect_cb.call(e) if @disconnect_cb
    raise @last_err || e
  rescue => e
    # Capture sticky error
    synchronize do
      @last_err = e
      srv[:auth_required] ||= true if @server_info[:auth_required]
      server_pool << srv if can_reuse_server?(srv)
    end

    err_cb_call(self, e, nil) if @err_cb

    if should_not_reconnect?
      @disconnect_cb.call(e) if @disconnect_cb
      raise e
    end

    # Clean up any connecting state and close connection without
    # triggering the disconnection/closed callbacks.
    close_connection(DISCONNECTED, false)

    # always sleep here to safe guard against errors before current[:was_connected]
    # is set for the first time
    sleep @options[:reconnect_time_wait] if @options[:reconnect_time_wait]

    # Continue retrying until there are no options left in the server pool
    retry
  end

  # Initialize queues and loops for message dispatching and processing engine
  @flush_queue = SizedQueue.new(NATS::IO::MAX_FLUSH_KICK_SIZE)
  @pending_queue = SizedQueue.new(NATS::IO::MAX_PENDING_SIZE)
  @pings_outstanding = 0
  @pongs_received = 0
  @pending_size = 0

  # Server roundtrip went ok so consider to be connected at this point
  @status = CONNECTED

  # Connected to NATS so Ready to start parser loop, flusher and ping interval
  start_threads!

  self
end

#connected?Boolean

Returns:

  • (Boolean)


712
713
714
# File 'lib/nats/io/client.rb', line 712

def connected?
  @status == CONNECTED
end

#connecting?Boolean

Returns:

  • (Boolean)


716
717
718
# File 'lib/nats/io/client.rb', line 716

def connecting?
  @status == CONNECTING
end

#discovered_serversObject

discovered_servers returns the NATS Servers that have been discovered via INFO protocol updates.



691
692
693
# File 'lib/nats/io/client.rb', line 691

def discovered_servers
  servers.select {|s| s[:discovered] }
end

#drainObject

drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ‘on_close` callback option to know when the connection has moved from draining to closed.



768
769
770
771
772
773
774
# File 'lib/nats/io/client.rb', line 768

def drain
  return if draining?

  synchronize do
    @drain_t ||= Thread.new { do_drain }
  end
end

#draining?Boolean

Returns:

  • (Boolean)


728
729
730
731
732
733
734
735
736
737
738
739
# File 'lib/nats/io/client.rb', line 728

def draining?
  if @status == DRAINING_PUBS or @status == DRAINING_SUBS
    return true
  end

  is_draining = false
  synchronize do
    is_draining = true if @drain_t
  end

  is_draining
end

#flush(timeout = 10) ⇒ Object

Send a ping and wait for a pong back within a timeout.



671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
# File 'lib/nats/io/client.rb', line 671

def flush(timeout=10)
  # Schedule sending a PING, and block until we receive PONG back,
  # or raise a timeout in case the response is past the deadline.
  pong = @pongs.new_cond
  @pongs.synchronize do
    @pongs << pong

    # Flush once pong future has been prepared
    @pending_queue << PING_REQUEST
    @flush_queue << :ping
    MonotonicTime::with_nats_timeout(timeout) do
      pong.wait(timeout)
    end
  end
end

#jetstream(opts = {}) ⇒ NATS::JetStream Also known as: JetStream, jsm

Create a JetStream context.

Parameters:

  • opts (Hash) (defaults to: {})

    Options to customize the JetStream context.

  • params (Hash)

    a customizable set of options

Returns:



782
783
784
# File 'lib/nats/io/client.rb', line 782

def jetstream(opts={})
  ::NATS::JetStream.new(self, opts)
end

#last_errorObject



757
758
759
760
761
# File 'lib/nats/io/client.rb', line 757

def last_error
  synchronize do
    @last_err
  end
end

#new_inboxString

new_inbox returns a unique inbox used for subscriptions.

Returns:

  • (String)


704
705
706
# File 'lib/nats/io/client.rb', line 704

def new_inbox
  "#{@inbox_prefix}.#{@nuid.next}"
end

#old_request(subject, payload, opts = {}, &blk) ⇒ Object

Sends a request creating an ephemeral subscription for the request, expecting a single response or raising a timeout in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.



607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
# File 'lib/nats/io/client.rb', line 607

def old_request(subject, payload, opts={}, &blk)
  return unless subject
  inbox = new_inbox

  # If a callback was passed, then have it process
  # the messages asynchronously and return the sid.
  if blk
    opts[:max] ||= 1
    s = subscribe(inbox, opts) do |msg|
      case blk.arity
      when 0 then blk.call
      when 1 then blk.call(msg)
      when 2 then blk.call(msg.data, msg.reply)
      when 3 then blk.call(msg.data, msg.reply, msg.subject)
      else blk.call(msg.data, msg.reply, msg.subject, msg.header)
      end
    end
    publish(subject, payload, inbox)

    return s
  end

  # In case block was not given, handle synchronously
  # with a timeout and only allow a single response.
  timeout = opts[:timeout] ||= 0.5
  opts[:max] = 1

  sub = Subscription.new
  sub.subject = inbox
  sub.received = 0
  future = sub.new_cond
  sub.future = future
  sub.nc = self

  sid = nil
  synchronize do
    sid = (@ssid += 1)
    sub.sid = sid
    @subs[sid] = sub
  end

  send_command("SUB #{inbox} #{sid}#{CR_LF}")
  @flush_queue << :sub
  unsubscribe(sub, 1)

  sub.synchronize do
    # Publish the request and then wait for the response...
    publish(subject, payload, inbox)

    MonotonicTime::with_nats_timeout(timeout) do
      future.wait(timeout)
    end
  end
  response = sub.response

  if response and response.header
    status = response.header[STATUS_HDR]
    raise NATS::IO::NoRespondersError if status == "503"
  end

  response
end

#on_close(&callback) ⇒ Object



753
754
755
# File 'lib/nats/io/client.rb', line 753

def on_close(&callback)
  @close_cb = callback
end

#on_disconnect(&callback) ⇒ Object



745
746
747
# File 'lib/nats/io/client.rb', line 745

def on_disconnect(&callback)
  @disconnect_cb = callback
end

#on_error(&callback) ⇒ Object



741
742
743
# File 'lib/nats/io/client.rb', line 741

def on_error(&callback)
  @err_cb = callback
end

#on_reconnect(&callback) ⇒ Object



749
750
751
# File 'lib/nats/io/client.rb', line 749

def on_reconnect(&callback)
  @reconnect_cb = callback
end

#publish(subject, msg = EMPTY_MSG, opt_reply = nil, **options, &blk) ⇒ Object



368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/nats/io/client.rb', line 368

def publish(subject, msg=EMPTY_MSG, opt_reply=nil, **options, &blk)
  raise NATS::IO::BadSubject if !subject or subject.empty?
  if options[:header]
    return publish_msg(NATS::Msg.new(subject: subject, data: msg, reply: opt_reply, header: options[:header]))
  end

  # Accounting
  msg_size = msg.bytesize
  @stats[:out_msgs] += 1
  @stats[:out_bytes] += msg_size

  send_command("PUB #{subject} #{opt_reply} #{msg_size}\r\n#{msg}\r\n")
  @flush_queue << :pub if @flush_queue.empty?
end

#publish_msg(msg) ⇒ Object

Publishes a NATS::Msg that may include headers.

Raises:

  • (TypeError)


384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/nats/io/client.rb', line 384

def publish_msg(msg)
  raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg)
  raise NATS::IO::BadSubject if !msg.subject or msg.subject.empty?

  msg.reply ||= ''
  msg.data ||= ''
  msg_size = msg.data.bytesize

  # Accounting
  @stats[:out_msgs] += 1
  @stats[:out_bytes] += msg_size

  if msg.header
    hdr = ''
    hdr << NATS_HDR_LINE
    msg.header.each do |k, v|
      hdr << "#{k}: #{v}#{CR_LF}"
    end
    hdr << CR_LF
    hdr_len = hdr.bytesize
    total_size = msg_size + hdr_len
    send_command("HPUB #{msg.subject} #{msg.reply} #{hdr_len} #{total_size}\r\n#{hdr}#{msg.data}\r\n")
  else
    send_command("PUB #{msg.subject} #{msg.reply} #{msg_size}\r\n#{msg.data}\r\n")
  end

  @flush_queue << :pub if @flush_queue.empty?
end

#reconnecting?Boolean

Returns:

  • (Boolean)


720
721
722
# File 'lib/nats/io/client.rb', line 720

def reconnecting?
  @status == RECONNECTING
end

#request(subject, payload = "", **opts, &blk) ⇒ Object

Sends a request using expecting a single response using a single subscription per connection for receiving the responses. It times out in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.



492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# File 'lib/nats/io/client.rb', line 492

def request(subject, payload="", **opts, &blk)
  raise NATS::IO::BadSubject if !subject or subject.empty?

  # If a block was given then fallback to method using auto unsubscribe.
  return old_request(subject, payload, opts, &blk) if blk
  return old_request(subject, payload, opts) if opts[:old_style]

  if opts[:header]
    return request_msg(NATS::Msg.new(subject: subject, data: payload, header: opts[:header]), **opts)
  end

  token = nil
  inbox = nil
  future = nil
  response = nil
  timeout = opts[:timeout] ||= 0.5
  synchronize do
    start_resp_mux_sub! unless @resp_sub_prefix

    # Create token for this request.
    token = @nuid.next
    inbox = "#{@resp_sub_prefix}.#{token}"

    # Create the a future for the request that will
    # get signaled when it receives the request.
    future = @resp_sub.new_cond
    @resp_map[token][:future] = future
  end

  # Publish request and wait for reply.
  publish(subject, payload, inbox)
  begin
    MonotonicTime::with_nats_timeout(timeout) do
      @resp_sub.synchronize do
        future.wait(timeout)
      end
    end
  rescue NATS::Timeout => e
    synchronize { @resp_map.delete(token) }
    raise e
  end

  # Check if there is a response already.
  synchronize do
    result = @resp_map[token]
    response = result[:response]
    @resp_map.delete(token)
  end

  if response and response.header
    status = response.header[STATUS_HDR]
    raise NATS::IO::NoRespondersError if status == "503"
  end

  response
end

#request_msg(msg, **opts) ⇒ Object

request_msg makes a NATS request using a NATS::Msg that may include headers.

Raises:

  • (TypeError)


550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
# File 'lib/nats/io/client.rb', line 550

def request_msg(msg, **opts)
  raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg)
  raise NATS::IO::BadSubject if !msg.subject or msg.subject.empty?

  token = nil
  inbox = nil
  future = nil
  response = nil
  timeout = opts[:timeout] ||= 0.5
  synchronize do
    start_resp_mux_sub! unless @resp_sub_prefix

    # Create token for this request.
    token = @nuid.next
    inbox = "#{@resp_sub_prefix}.#{token}"

    # Create the a future for the request that will
    # get signaled when it receives the request.
    future = @resp_sub.new_cond
    @resp_map[token][:future] = future
  end
  msg.reply = inbox
  msg.data ||= ''
  msg_size = msg.data.bytesize

  # Publish request and wait for reply.
  publish_msg(msg)
  begin
    MonotonicTime::with_nats_timeout(timeout) do
      @resp_sub.synchronize do
        future.wait(timeout)
      end
    end
  rescue NATS::Timeout => e
    synchronize { @resp_map.delete(token) }
    raise e
  end

  # Check if there is a response already.
  synchronize do
    result = @resp_map[token]
    response = result[:response]
    @resp_map.delete(token)
  end

  if response and response.header
    status = response.header[STATUS_HDR]
    raise NATS::IO::NoRespondersError if status == "503"
  end

  response
end

#subscribe(subject, opts = {}, &callback) ⇒ Object

Create subscription which is dispatched asynchronously messages to a callback.



415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/nats/io/client.rb', line 415

def subscribe(subject, opts={}, &callback)
  raise NATS::IO::ConnectionDrainingError.new("nats: connection draining") if draining?

  sid = nil
  sub = nil
  synchronize do
    sid = (@ssid += 1)
    sub = @subs[sid] = Subscription.new
    sub.nc = self
    sub.sid = sid
  end
  opts[:pending_msgs_limit]  ||= NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT
  opts[:pending_bytes_limit] ||= NATS::IO::DEFAULT_SUB_PENDING_BYTES_LIMIT

  sub.subject = subject
  sub.callback = callback
  sub.received = 0
  sub.queue = opts[:queue] if opts[:queue]
  sub.max = opts[:max] if opts[:max]
  sub.pending_msgs_limit  = opts[:pending_msgs_limit]
  sub.pending_bytes_limit = opts[:pending_bytes_limit]
  sub.pending_queue = SizedQueue.new(sub.pending_msgs_limit)

  send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}")
  @flush_queue << :sub

  # Setup server support for auto-unsubscribe when receiving enough messages
  sub.unsubscribe(opts[:max]) if opts[:max]

  unless callback
    cond = sub.new_cond
    sub.wait_for_msgs_cond = cond
  end

  # Async subscriptions each own a single thread for the
  # delivery of messages.
  # FIXME: Support shared thread pool with configurable limits
  # to better support case of having a lot of subscriptions.
  sub.wait_for_msgs_t = Thread.new do
    loop do
      msg = sub.pending_queue.pop

      cb = nil
      sub.synchronize do

        # Decrease pending size since consumed already
        sub.pending_size -= msg.data.size
        cb = sub.callback
      end

      begin
        # Note: Keep some of the alternative arity versions to slightly
        # improve backwards compatibility.  Eventually fine to deprecate
        # since recommended version would be arity of 1 to get a NATS::Msg.
        case cb.arity
        when 0 then cb.call
        when 1 then cb.call(msg)
        when 2 then cb.call(msg.data, msg.reply)
        when 3 then cb.call(msg.data, msg.reply, msg.subject)
        else cb.call(msg.data, msg.reply, msg.subject, msg.header)
        end
      rescue => e
        synchronize do
          err_cb_call(self, e, sub) if @err_cb
        end
      end
    end
  end if callback

  sub
end