Class: NATS::Client

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, Status
Defined in:
lib/nats/io/client.rb

Overview

Client creates a connection to the NATS Server.

Constant Summary collapse

DEFAULT_PORT =
{ nats: 4222, ws: 80, wss: 443 }.freeze
DEFAULT_URI =
("nats://localhost:#{DEFAULT_PORT[:nats]}".freeze)
CR_LF =
("\r\n".freeze)
CR_LF_SIZE =
(CR_LF.bytesize)
PING_REQUEST =
("PING#{CR_LF}".freeze)
PONG_RESPONSE =
("PONG#{CR_LF}".freeze)
NATS_HDR_LINE =
("NATS/1.0#{CR_LF}".freeze)
STATUS_MSG_LEN =
3
STATUS_HDR =
("Status".freeze)
DESC_HDR =
("Description".freeze)
NATS_HDR_LINE_SIZE =
(NATS_HDR_LINE.bytesize)
SUB_OP =
('SUB'.freeze)
EMPTY_MSG =
(''.freeze)

Constants included from Status

Status::CLOSED, Status::CONNECTED, Status::CONNECTING, Status::DISCONNECTED, Status::DRAINING_PUBS, Status::DRAINING_SUBS, Status::RECONNECTING

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = nil, opts = {}) ⇒ Client

Returns a new instance of Client.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/nats/io/client.rb', line 152

def initialize(uri = nil, opts = {})
  super() # required to initialize monitor
  @initial_uri = uri
  @initial_options = opts

  # Read/Write IO
  @io = nil

  # Queues for coalescing writes of commands we need to send to server.
  @flush_queue = nil
  @pending_queue = nil

  # Parser with state
  @parser = NATS::Protocol::Parser.new(self)

  # Threads for both reading and flushing command
  @flusher_thread = nil
  @read_loop_thread = nil
  @ping_interval_thread = nil

  # Info that we get from the server
  @server_info = { }

  # URI from server to which we are currently connected
  @uri = nil
  @server_pool = []

  @status = nil

  # Subscriptions
  @subs = { }
  @ssid = 0

  # Ping interval
  @pings_outstanding = 0
  @pongs_received = 0
  @pongs = []
  @pongs.extend(MonitorMixin)

  # Accounting
  @pending_size = 0
  @stats = {
    in_msgs: 0,
    out_msgs: 0,
    in_bytes: 0,
    out_bytes: 0,
    reconnects: 0
  }

  # Sticky error
  @last_err = nil

  # Async callbacks, no ops by default.
  @err_cb = proc { }
  @close_cb = proc { }
  @disconnect_cb = proc { }
  @reconnect_cb = proc { }

  # Secure TLS options
  @tls = nil

  # Hostname of current server; used for when TLS host
  # verification is enabled.
  @hostname = nil
  @single_url_connect_used = false

  # Track whether connect has been already been called.
  @connect_called = false

  # New style request/response implementation.
  @resp_sub = nil
  @resp_map = nil
  @resp_sub_prefix = nil
  @nuid = NATS::NUID.new

  # NKEYS
  @user_credentials = nil
  @nkeys_seed = nil
  @user_nkey_cb = nil
  @user_jwt_cb = nil
  @signature_cb = nil

  # Tokens
  @auth_token = nil

  @inbox_prefix = "_INBOX"

  # Draining
  @drain_t = nil

  # Prepare for calling connect or automatic delayed connection
  parse_and_validate_options if uri || opts.any?

  # Keep track of all client instances to handle them after process forking in Ruby 3.1+
  INSTANCES[self] = self if !defined?(Ractor) || Ractor.current == Ractor.main # Ractors doesn't work in forked processes

  @reloader = opts.fetch(:reloader, self.class.default_reloader)
end

Class Attribute Details

.default_reloaderObject



131
132
133
# File 'lib/nats/io/client.rb', line 131

def default_reloader
  @default_reloader ||= proc { |&block| block.call }.tap { |r| Ractor.make_shareable(r) if defined? Ractor }
end

Instance Attribute Details

#connected_serverObject (readonly)

Returns the value of attribute connected_server.



103
104
105
# File 'lib/nats/io/client.rb', line 103

def connected_server
  @connected_server
end

#optionsObject (readonly)

Returns the value of attribute options.



103
104
105
# File 'lib/nats/io/client.rb', line 103

def options
  @options
end

#reloaderObject (readonly)

Returns the value of attribute reloader.



103
104
105
# File 'lib/nats/io/client.rb', line 103

def reloader
  @reloader
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



103
104
105
# File 'lib/nats/io/client.rb', line 103

def server_info
  @server_info
end

#server_poolObject (readonly) Also known as: servers

Returns the value of attribute server_pool.



103
104
105
# File 'lib/nats/io/client.rb', line 103

def server_pool
  @server_pool
end

#statsObject (readonly)

Returns the value of attribute stats.



103
104
105
# File 'lib/nats/io/client.rb', line 103

def stats
  @stats
end

#statusObject (readonly)

Returns the value of attribute status.



103
104
105
# File 'lib/nats/io/client.rb', line 103

def status
  @status
end

#subscription_executorObject (readonly)

Returns the value of attribute subscription_executor.



103
104
105
# File 'lib/nats/io/client.rb', line 103

def subscription_executor
  @subscription_executor
end

#uriObject (readonly)

Returns the value of attribute uri.



103
104
105
# File 'lib/nats/io/client.rb', line 103

def uri
  @uri
end

Class Method Details

.after_forkObject

Re-establish connection in a new process after forking to start new threads.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/nats/io/client.rb', line 136

def after_fork
  INSTANCES.each do |client|
    if client.options[:reconnect]
      was_connected = !client.disconnected?
      client.send(:close_connection, Status::DISCONNECTED, true)
      client.connect if was_connected
    else
      client.send(:err_cb_call, self, NATS::IO::ForkDetectedError, nil)
      client.close
    end
  rescue => e
    warn "nats: Error during handling after_fork callback: #{e}" # TODO: Report as async error via error callback?
  end
end

Instance Method Details

#closeObject

Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.



748
749
750
# File 'lib/nats/io/client.rb', line 748

def close
  close_connection(CLOSED, true)
end

#closed?Boolean

Returns:

  • (Boolean)


778
779
780
# File 'lib/nats/io/client.rb', line 778

def closed?
  @status == CLOSED
end

#connect(uri = nil, opts = {}) ⇒ Object

Prepare connecting to NATS, but postpone real connection until first usage.



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/nats/io/client.rb', line 252

def connect(uri=nil, opts={})
  if uri || opts.any?
    @initial_uri = uri
    @initial_options = opts
  end

  synchronize do
    # In case it has been connected already, then do not need to call this again.
    return if @connect_called
    @connect_called = true
  end

  parse_and_validate_options
  establish_connection!

  self
end

#connected?Boolean

Returns:

  • (Boolean)


766
767
768
# File 'lib/nats/io/client.rb', line 766

def connected?
  @status == CONNECTED
end

#connecting?Boolean

Returns:

  • (Boolean)


770
771
772
# File 'lib/nats/io/client.rb', line 770

def connecting?
  @status == CONNECTING
end

#disconnected?Boolean

Returns:

  • (Boolean)


762
763
764
# File 'lib/nats/io/client.rb', line 762

def disconnected?
  !@status or @status == DISCONNECTED
end

#discovered_serversObject

discovered_servers returns the NATS Servers that have been discovered via INFO protocol updates.



741
742
743
# File 'lib/nats/io/client.rb', line 741

def discovered_servers
  servers.select {|s| s[:discovered] }
end

#drainObject

drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ‘on_close` callback option to know when the connection has moved from draining to closed.



822
823
824
825
826
827
828
# File 'lib/nats/io/client.rb', line 822

def drain
  return if draining?

  synchronize do
    @drain_t ||= Thread.new { do_drain }
  end
end

#draining?Boolean

Returns:

  • (Boolean)


782
783
784
785
786
787
788
789
790
791
792
793
# File 'lib/nats/io/client.rb', line 782

def draining?
  if @status == DRAINING_PUBS or @status == DRAINING_SUBS
    return true
  end

  is_draining = false
  synchronize do
    is_draining = true if @drain_t
  end

  is_draining
end

#flush(timeout = 10) ⇒ Object

Send a ping and wait for a pong back within a timeout.



721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
# File 'lib/nats/io/client.rb', line 721

def flush(timeout=10)
  # Schedule sending a PING, and block until we receive PONG back,
  # or raise a timeout in case the response is past the deadline.
  pong = @pongs.new_cond
  @pongs.synchronize do
    @pongs << pong

    # Flush once pong future has been prepared
    @pending_queue << PING_REQUEST
    @flush_queue << :ping
    MonotonicTime::with_nats_timeout(timeout) do
      pong.wait(timeout)
    end
  end
end

#jetstream(opts = {}) ⇒ NATS::JetStream Also known as: JetStream, jsm

Create a JetStream context.

Parameters:

  • opts (Hash) (defaults to: {})

    Options to customize the JetStream context.

  • params (Hash)

    a customizable set of options

Returns:



836
837
838
# File 'lib/nats/io/client.rb', line 836

def jetstream(opts={})
  ::NATS::JetStream.new(self, opts)
end

#last_errorObject



811
812
813
814
815
# File 'lib/nats/io/client.rb', line 811

def last_error
  synchronize do
    @last_err
  end
end

#new_inboxString

new_inbox returns a unique inbox used for subscriptions.

Returns:

  • (String)


754
755
756
# File 'lib/nats/io/client.rb', line 754

def new_inbox
  "#{@inbox_prefix}.#{@nuid.next}"
end

#old_request(subject, payload, opts = {}, &blk) ⇒ Object

Sends a request creating an ephemeral subscription for the request, expecting a single response or raising a timeout in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.



657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
# File 'lib/nats/io/client.rb', line 657

def old_request(subject, payload, opts={}, &blk)
  return unless subject
  inbox = new_inbox

  # If a callback was passed, then have it process
  # the messages asynchronously and return the sid.
  if blk
    opts[:max] ||= 1
    s = subscribe(inbox, opts) do |msg|
      case blk.arity
      when 0 then blk.call
      when 1 then blk.call(msg)
      when 2 then blk.call(msg.data, msg.reply)
      when 3 then blk.call(msg.data, msg.reply, msg.subject)
      else blk.call(msg.data, msg.reply, msg.subject, msg.header)
      end
    end
    publish(subject, payload, inbox)

    return s
  end

  # In case block was not given, handle synchronously
  # with a timeout and only allow a single response.
  timeout = opts[:timeout] ||= 0.5
  opts[:max] = 1

  sub = Subscription.new
  sub.subject = inbox
  sub.received = 0
  future = sub.new_cond
  sub.future = future
  sub.nc = self

  sid = nil
  synchronize do
    sid = (@ssid += 1)
    sub.sid = sid
    @subs[sid] = sub
  end

  send_command("SUB #{inbox} #{sid}#{CR_LF}")
  @flush_queue << :sub
  unsubscribe(sub, 1)

  sub.synchronize do
    # Publish the request and then wait for the response...
    publish(subject, payload, inbox)

    MonotonicTime::with_nats_timeout(timeout) do
      future.wait(timeout)
    end
  end
  response = sub.response

  if response and response.header
    status = response.header[STATUS_HDR]
    raise NATS::IO::NoRespondersError if status == "503"
  end

  response
end

#on_close(&callback) ⇒ Object



807
808
809
# File 'lib/nats/io/client.rb', line 807

def on_close(&callback)
  @close_cb = callback
end

#on_disconnect(&callback) ⇒ Object



799
800
801
# File 'lib/nats/io/client.rb', line 799

def on_disconnect(&callback)
  @disconnect_cb = callback
end

#on_error(&callback) ⇒ Object



795
796
797
# File 'lib/nats/io/client.rb', line 795

def on_error(&callback)
  @err_cb = callback
end

#on_reconnect(&callback) ⇒ Object



803
804
805
# File 'lib/nats/io/client.rb', line 803

def on_reconnect(&callback)
  @reconnect_cb = callback
end

#publish(subject, msg = EMPTY_MSG, opt_reply = nil, **options, &blk) ⇒ Object



452
453
454
455
456
457
458
459
460
461
462
463
464
465
# File 'lib/nats/io/client.rb', line 452

def publish(subject, msg=EMPTY_MSG, opt_reply=nil, **options, &blk)
  raise NATS::IO::BadSubject if !subject or subject.empty?
  if options[:header]
    return publish_msg(NATS::Msg.new(subject: subject, data: msg, reply: opt_reply, header: options[:header]))
  end

  # Accounting
  msg_size = msg.bytesize
  @stats[:out_msgs] += 1
  @stats[:out_bytes] += msg_size

  send_command("PUB #{subject} #{opt_reply} #{msg_size}\r\n#{msg}\r\n")
  @flush_queue << :pub if @flush_queue.empty?
end

#publish_msg(msg) ⇒ Object

Publishes a NATS::Msg that may include headers.

Raises:

  • (TypeError)


468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/nats/io/client.rb', line 468

def publish_msg(msg)
  raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg)
  raise NATS::IO::BadSubject if !msg.subject or msg.subject.empty?

  msg.reply ||= ''
  msg.data ||= ''
  msg_size = msg.data.bytesize

  # Accounting
  @stats[:out_msgs] += 1
  @stats[:out_bytes] += msg_size

  if msg.header
    hdr = ''
    hdr << NATS_HDR_LINE
    msg.header.each do |k, v|
      hdr << "#{k}: #{v}#{CR_LF}"
    end
    hdr << CR_LF
    hdr_len = hdr.bytesize
    total_size = msg_size + hdr_len
    send_command("HPUB #{msg.subject} #{msg.reply} #{hdr_len} #{total_size}\r\n#{hdr}#{msg.data}\r\n")
  else
    send_command("PUB #{msg.subject} #{msg.reply} #{msg_size}\r\n#{msg.data}\r\n")
  end

  @flush_queue << :pub if @flush_queue.empty?
end

#reconnecting?Boolean

Returns:

  • (Boolean)


774
775
776
# File 'lib/nats/io/client.rb', line 774

def reconnecting?
  @status == RECONNECTING
end

#request(subject, payload = "", **opts, &blk) ⇒ Object

Sends a request using expecting a single response using a single subscription per connection for receiving the responses. It times out in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.



542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
# File 'lib/nats/io/client.rb', line 542

def request(subject, payload="", **opts, &blk)
  raise NATS::IO::BadSubject if !subject or subject.empty?

  # If a block was given then fallback to method using auto unsubscribe.
  return old_request(subject, payload, opts, &blk) if blk
  return old_request(subject, payload, opts) if opts[:old_style]

  if opts[:header]
    return request_msg(NATS::Msg.new(subject: subject, data: payload, header: opts[:header]), **opts)
  end

  token = nil
  inbox = nil
  future = nil
  response = nil
  timeout = opts[:timeout] ||= 0.5
  synchronize do
    start_resp_mux_sub! unless @resp_sub_prefix

    # Create token for this request.
    token = @nuid.next
    inbox = "#{@resp_sub_prefix}.#{token}"

    # Create the a future for the request that will
    # get signaled when it receives the request.
    future = @resp_sub.new_cond
    @resp_map[token][:future] = future
  end

  # Publish request and wait for reply.
  publish(subject, payload, inbox)
  begin
    MonotonicTime::with_nats_timeout(timeout) do
      @resp_sub.synchronize do
        future.wait(timeout)
      end
    end
  rescue NATS::Timeout => e
    synchronize { @resp_map.delete(token) }
    raise e
  end

  # Check if there is a response already.
  synchronize do
    result = @resp_map[token]
    response = result[:response]
    @resp_map.delete(token)
  end

  if response and response.header
    status = response.header[STATUS_HDR]
    raise NATS::IO::NoRespondersError if status == "503"
  end

  response
end

#request_msg(msg, **opts) ⇒ Object

request_msg makes a NATS request using a NATS::Msg that may include headers.

Raises:

  • (TypeError)


600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
# File 'lib/nats/io/client.rb', line 600

def request_msg(msg, **opts)
  raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg)
  raise NATS::IO::BadSubject if !msg.subject or msg.subject.empty?

  token = nil
  inbox = nil
  future = nil
  response = nil
  timeout = opts[:timeout] ||= 0.5
  synchronize do
    start_resp_mux_sub! unless @resp_sub_prefix

    # Create token for this request.
    token = @nuid.next
    inbox = "#{@resp_sub_prefix}.#{token}"

    # Create the a future for the request that will
    # get signaled when it receives the request.
    future = @resp_sub.new_cond
    @resp_map[token][:future] = future
  end
  msg.reply = inbox
  msg.data ||= ''
  msg_size = msg.data.bytesize

  # Publish request and wait for reply.
  publish_msg(msg)
  begin
    MonotonicTime::with_nats_timeout(timeout) do
      @resp_sub.synchronize do
        future.wait(timeout)
      end
    end
  rescue NATS::Timeout => e
    synchronize { @resp_map.delete(token) }
    raise e
  end

  # Check if there is a response already.
  synchronize do
    result = @resp_map[token]
    response = result[:response]
    @resp_map.delete(token)
  end

  if response and response.header
    status = response.header[STATUS_HDR]
    raise NATS::IO::NoRespondersError if status == "503"
  end

  response
end

#subscribe(subject, opts = {}, &callback) ⇒ Object

Create subscription which is dispatched asynchronously messages to a callback.



499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
# File 'lib/nats/io/client.rb', line 499

def subscribe(subject, opts={}, &callback)
  raise NATS::IO::ConnectionDrainingError.new("nats: connection draining") if draining?

  sid = nil
  sub = nil
  synchronize do
    sid = (@ssid += 1)
    sub = @subs[sid] = Subscription.new
    sub.nc = self
    sub.sid = sid
  end
  opts[:pending_msgs_limit]  ||= NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT
  opts[:pending_bytes_limit] ||= NATS::IO::DEFAULT_SUB_PENDING_BYTES_LIMIT

  sub.subject = subject
  sub.callback = callback
  sub.received = 0
  sub.queue = opts[:queue] if opts[:queue]
  sub.max = opts[:max] if opts[:max]
  sub.pending_msgs_limit  = opts[:pending_msgs_limit]
  sub.pending_bytes_limit = opts[:pending_bytes_limit]
  sub.pending_queue = SizedQueue.new(sub.pending_msgs_limit)
  sub.processing_concurrency = opts[:processing_concurrency] if opts.key?(:processing_concurrency)

  send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}")
  @flush_queue << :sub

  # Setup server support for auto-unsubscribe when receiving enough messages
  sub.unsubscribe(opts[:max]) if opts[:max]

  unless callback
    cond = sub.new_cond
    sub.wait_for_msgs_cond = cond
  end

  sub
end