Class: NATS::Client

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, Status
Defined in:
lib/nats/io/client.rb

Overview

Client creates a connection to the NATS Server.

Constant Summary collapse

DEFAULT_PORT =
{nats: 4222, ws: 80, wss: 443}.freeze
DEFAULT_URI =
"nats://localhost:#{DEFAULT_PORT[:nats]}".freeze
CR_LF =
"\r\n"
CR_LF_SIZE =
CR_LF.bytesize
PING_REQUEST =
"PING#{CR_LF}".freeze
PONG_RESPONSE =
"PONG#{CR_LF}".freeze
NATS_HDR_LINE =
"NATS/1.0#{CR_LF}".freeze
STATUS_MSG_LEN =
3
STATUS_HDR =
"Status"
DESC_HDR =
"Description"
NATS_HDR_LINE_SIZE =
NATS_HDR_LINE.bytesize
SUB_OP =
"SUB"
EMPTY_MSG =
""

Constants included from Status

Status::CLOSED, Status::CONNECTED, Status::CONNECTING, Status::DISCONNECTED, Status::DRAINING_PUBS, Status::DRAINING_SUBS, Status::RECONNECTING

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = nil, opts = {}) ⇒ Client

Returns a new instance of Client.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/nats/io/client.rb', line 155

def initialize(uri = nil, opts = {})
  super() # required to initialize monitor
  @initial_uri = uri
  @initial_options = opts

  # Read/Write IO
  @io = nil

  # Queues for coalescing writes of commands we need to send to server.
  @flush_queue = nil
  @pending_queue = nil

  # Parser with state
  @parser = NATS::Protocol::Parser.new(self)

  # Threads for both reading and flushing command
  @flusher_thread = nil
  @read_loop_thread = nil
  @ping_interval_thread = nil

  # Info that we get from the server
  @server_info = {}

  # URI from server to which we are currently connected
  @uri = nil
  @server_pool = []

  @status = nil

  # Subscriptions
  @subs = {}
  @ssid = 0

  # Ping interval
  @pings_outstanding = 0
  @pongs_received = 0
  @pongs = []
  @pongs.extend(MonitorMixin)

  # Accounting
  @pending_size = 0
  @stats = {
    in_msgs: 0,
    out_msgs: 0,
    in_bytes: 0,
    out_bytes: 0,
    reconnects: 0
  }

  # Sticky error
  @last_err = nil

  # Async callbacks, no ops by default.
  @err_cb = proc {}
  @close_cb = proc {}
  @disconnect_cb = proc {}
  @reconnect_cb = proc {}

  # Secure TLS options
  @tls = nil

  # Hostname of current server; used for when TLS host
  # verification is enabled.
  @hostname = nil
  @single_url_connect_used = false

  # Track whether connect has been already been called.
  @connect_called = false

  # New style request/response implementation.
  @resp_sub = nil
  @resp_map = nil
  @resp_sub_prefix = nil
  @nuid = NATS::NUID.new

  # NKEYS
  @user_credentials = nil
  @nkeys_seed = nil
  @user_nkey_cb = nil
  @user_jwt_cb = nil
  @signature_cb = nil

  # Tokens
  @auth_token = nil

  @inbox_prefix = "_INBOX"

  # Draining
  @drain_t = nil

  # Service API
  @_services = nil

  # Prepare for calling connect or automatic delayed connection
  parse_and_validate_options if uri || opts.any?

  # Keep track of all client instances to handle them after process forking in Ruby 3.1+
  INSTANCES[self] = self if !defined?(Ractor) || Ractor.current == Ractor.main # Ractors doesn't work in forked processes

  @reloader = opts.fetch(:reloader, self.class.default_reloader)
end

Class Attribute Details

.default_reloaderObject



132
133
134
# File 'lib/nats/io/client.rb', line 132

def default_reloader
  @default_reloader ||= proc { |&block| block.call }.tap { |r| Ractor.make_shareable(r) if defined? Ractor }
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



104
105
106
# File 'lib/nats/io/client.rb', line 104

def options
  @options
end

#reloaderObject (readonly)

Returns the value of attribute reloader.



104
105
106
# File 'lib/nats/io/client.rb', line 104

def reloader
  @reloader
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



104
105
106
# File 'lib/nats/io/client.rb', line 104

def server_info
  @server_info
end

#server_poolObject (readonly) Also known as: servers

Returns the value of attribute server_pool.



104
105
106
# File 'lib/nats/io/client.rb', line 104

def server_pool
  @server_pool
end

#statsObject (readonly)

Returns the value of attribute stats.



104
105
106
# File 'lib/nats/io/client.rb', line 104

def stats
  @stats
end

#statusObject (readonly)

Returns the value of attribute status.



104
105
106
# File 'lib/nats/io/client.rb', line 104

def status
  @status
end

#subscription_executorObject (readonly)

Returns the value of attribute subscription_executor.



104
105
106
# File 'lib/nats/io/client.rb', line 104

def subscription_executor
  @subscription_executor
end

#uriObject (readonly)

Returns the value of attribute uri.



104
105
106
# File 'lib/nats/io/client.rb', line 104

def uri
  @uri
end

Class Method Details

.after_forkObject

Re-establish connection in a new process after forking to start new threads.



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/nats/io/client.rb', line 137

def after_fork
  INSTANCES.each do |client|
    next if client.closed?

    if client.options[:reconnect]
      was_connected = !client.disconnected?
      client.send(:close_connection, Status::DISCONNECTED, true)
      client.connect if was_connected
    else
      client.send(:err_cb_call, self, NATS::IO::ForkDetectedError, nil)
      client.close
    end
  rescue => e
    warn "nats: Error during handling after_fork callback: #{e}" # TODO: Report as async error via error callback?
  end
end

Instance Method Details

#closeObject

Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.



768
769
770
# File 'lib/nats/io/client.rb', line 768

def close
  close_connection(CLOSED, true)
end

#closed?Boolean

Returns:

  • (Boolean)


798
799
800
# File 'lib/nats/io/client.rb', line 798

def closed?
  @status == CLOSED
end

#connect(uri = nil, opts = {}) ⇒ Object

Prepare connecting to NATS, but postpone real connection until first usage.



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/nats/io/client.rb', line 258

def connect(uri = nil, opts = {})
  if uri || opts.any?
    @initial_uri = uri
    @initial_options = opts
  end

  synchronize do
    # In case it has been connected already, then do not need to call this again.
    return if @connect_called
    @connect_called = true
  end

  parse_and_validate_options
  establish_connection!

  self
end

#connected?Boolean

Returns:

  • (Boolean)


786
787
788
# File 'lib/nats/io/client.rb', line 786

def connected?
  @status == CONNECTED
end

#connected_serverObject



778
779
780
# File 'lib/nats/io/client.rb', line 778

def connected_server
  connected? ? @uri : nil
end

#connecting?Boolean

Returns:

  • (Boolean)


790
791
792
# File 'lib/nats/io/client.rb', line 790

def connecting?
  @status == CONNECTING
end

#disconnected?Boolean

Returns:

  • (Boolean)


782
783
784
# File 'lib/nats/io/client.rb', line 782

def disconnected?
  !@status or @status == DISCONNECTED
end

#discovered_serversObject

discovered_servers returns the NATS Servers that have been discovered via INFO protocol updates.



761
762
763
# File 'lib/nats/io/client.rb', line 761

def discovered_servers
  servers.select { |s| s[:discovered] }
end

#drainObject

drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ‘on_close` callback option to know when the connection has moved from draining to closed.



842
843
844
845
846
847
848
# File 'lib/nats/io/client.rb', line 842

def drain
  return if draining?

  synchronize do
    @drain_t ||= Thread.new { do_drain }
  end
end

#draining?Boolean

Returns:

  • (Boolean)


802
803
804
805
806
807
808
809
810
811
812
813
# File 'lib/nats/io/client.rb', line 802

def draining?
  if (@status == DRAINING_PUBS) || (@status == DRAINING_SUBS)
    return true
  end

  is_draining = false
  synchronize do
    is_draining = true if @drain_t
  end

  is_draining
end

#flush(timeout = 10) ⇒ Object

Send a ping and wait for a pong back within a timeout.



741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
# File 'lib/nats/io/client.rb', line 741

def flush(timeout = 10)
  # Schedule sending a PING, and block until we receive PONG back,
  # or raise a timeout in case the response is past the deadline.
  pong = @pongs.new_cond
  @pongs.synchronize do
    @pongs << pong

    # Flush once pong future has been prepared
    @pending_queue << PING_REQUEST
    @flush_queue << :ping
    MonotonicTime.with_nats_timeout(timeout) do
      pong.wait(timeout)
    end
  end
end

#force_reconnectObject



276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/nats/io/client.rb', line 276

def force_reconnect
  synchronize do
    return true if reconnecting?

    if closed? || draining? || disconnected?
      raise NATS::IO::ConnectionClosedError
    end

    initiate_reconnect
    true
  end
end

#jetstream(opts = {}) ⇒ NATS::JetStream Also known as: JetStream, jsm

Create a JetStream context.

Parameters:

  • opts (Hash) (defaults to: {})

    Options to customize the JetStream context.

  • params (Hash)

    a customizable set of options

Returns:



856
857
858
# File 'lib/nats/io/client.rb', line 856

def jetstream(opts = {})
  ::NATS::JetStream.new(self, opts)
end

#last_errorObject



831
832
833
834
835
# File 'lib/nats/io/client.rb', line 831

def last_error
  synchronize do
    @last_err
  end
end

#new_inboxString

new_inbox returns a unique inbox used for subscriptions.

Returns:

  • (String)


774
775
776
# File 'lib/nats/io/client.rb', line 774

def new_inbox
  "#{@inbox_prefix}.#{@nuid.next}"
end

#old_request(subject, payload, opts = {}, &blk) ⇒ Object

Sends a request creating an ephemeral subscription for the request, expecting a single response or raising a timeout in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.



677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
# File 'lib/nats/io/client.rb', line 677

def old_request(subject, payload, opts = {}, &blk)
  return unless subject
  inbox = new_inbox

  # If a callback was passed, then have it process
  # the messages asynchronously and return the sid.
  if blk
    opts[:max] ||= 1
    s = subscribe(inbox, opts) do |msg|
      case blk.arity
      when 0 then blk.call
      when 1 then blk.call(msg)
      when 2 then blk.call(msg.data, msg.reply)
      when 3 then blk.call(msg.data, msg.reply, msg.subject)
      else blk.call(msg.data, msg.reply, msg.subject, msg.header)
      end
    end
    publish(subject, payload, inbox)

    return s
  end

  # In case block was not given, handle synchronously
  # with a timeout and only allow a single response.
  timeout = opts[:timeout] ||= 0.5
  opts[:max] = 1

  sub = Subscription.new
  sub.subject = inbox
  sub.received = 0
  future = sub.new_cond
  sub.future = future
  sub.nc = self

  sid = nil
  synchronize do
    sid = (@ssid += 1)
    sub.sid = sid
    @subs[sid] = sub
  end

  send_command("SUB #{inbox} #{sid}#{CR_LF}")
  @flush_queue << :sub
  unsubscribe(sub, 1)

  sub.synchronize do
    # Publish the request and then wait for the response...
    publish(subject, payload, inbox)

    MonotonicTime.with_nats_timeout(timeout) do
      future.wait(timeout)
    end
  end
  response = sub.response

  if response&.header
    status = response.header[STATUS_HDR]
    raise NATS::IO::NoRespondersError if status == "503"
  end

  response
end

#on_close(&callback) ⇒ Object



827
828
829
# File 'lib/nats/io/client.rb', line 827

def on_close(&callback)
  @close_cb = callback
end

#on_disconnect(&callback) ⇒ Object



819
820
821
# File 'lib/nats/io/client.rb', line 819

def on_disconnect(&callback)
  @disconnect_cb = callback
end

#on_error(&callback) ⇒ Object



815
816
817
# File 'lib/nats/io/client.rb', line 815

def on_error(&callback)
  @err_cb = callback
end

#on_reconnect(&callback) ⇒ Object



823
824
825
# File 'lib/nats/io/client.rb', line 823

def on_reconnect(&callback)
  @reconnect_cb = callback
end

#publish(subject, msg = EMPTY_MSG, opt_reply = nil, **options, &blk) ⇒ Object



472
473
474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/nats/io/client.rb', line 472

def publish(subject, msg = EMPTY_MSG, opt_reply = nil, **options, &blk)
  raise NATS::IO::BadSubject if !subject || subject.empty?
  if options[:header]
    return publish_msg(NATS::Msg.new(subject: subject, data: msg, reply: opt_reply, header: options[:header]))
  end

  # Accounting
  msg_size = msg.bytesize
  @stats[:out_msgs] += 1
  @stats[:out_bytes] += msg_size

  send_command("PUB #{subject} #{opt_reply} #{msg_size}\r\n#{msg}\r\n")
  @flush_queue << :pub if @flush_queue.empty?
end

#publish_msg(msg) ⇒ Object

Publishes a NATS::Msg that may include headers.

Raises:

  • (TypeError)


488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# File 'lib/nats/io/client.rb', line 488

def publish_msg(msg)
  raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg)
  raise NATS::IO::BadSubject if !msg.subject || msg.subject.empty?

  msg.reply ||= "".dup
  msg.data ||= "".dup
  msg_size = msg.data.bytesize

  # Accounting
  @stats[:out_msgs] += 1
  @stats[:out_bytes] += msg_size

  if msg.header
    hdr = "".dup
    hdr << NATS_HDR_LINE
    msg.header.each do |k, v|
      hdr << "#{k}: #{v}#{CR_LF}"
    end
    hdr << CR_LF
    hdr_len = hdr.bytesize
    total_size = msg_size + hdr_len
    send_command("HPUB #{msg.subject} #{msg.reply} #{hdr_len} #{total_size}\r\n#{hdr}#{msg.data}\r\n")
  else
    send_command("PUB #{msg.subject} #{msg.reply} #{msg_size}\r\n#{msg.data}\r\n")
  end

  @flush_queue << :pub if @flush_queue.empty?
end

#reconnecting?Boolean

Returns:

  • (Boolean)


794
795
796
# File 'lib/nats/io/client.rb', line 794

def reconnecting?
  @status == RECONNECTING
end

#request(subject, payload = "", **opts, &blk) ⇒ Object

Sends a request using expecting a single response using a single subscription per connection for receiving the responses. It times out in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.



562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
# File 'lib/nats/io/client.rb', line 562

def request(subject, payload = "", **opts, &blk)
  raise NATS::IO::BadSubject if !subject || subject.empty?

  # If a block was given then fallback to method using auto unsubscribe.
  return old_request(subject, payload, opts, &blk) if blk
  return old_request(subject, payload, opts) if opts[:old_style]

  if opts[:header]
    return request_msg(NATS::Msg.new(subject: subject, data: payload, header: opts[:header]), **opts)
  end

  token = nil
  inbox = nil
  future = nil
  response = nil
  timeout = opts[:timeout] ||= 0.5
  synchronize do
    start_resp_mux_sub! unless @resp_sub_prefix

    # Create token for this request.
    token = @nuid.next
    inbox = "#{@resp_sub_prefix}.#{token}"

    # Create the a future for the request that will
    # get signaled when it receives the request.
    future = @resp_sub.new_cond
    @resp_map[token][:future] = future
  end

  # Publish request and wait for reply.
  publish(subject, payload, inbox)
  begin
    MonotonicTime.with_nats_timeout(timeout) do
      @resp_sub.synchronize do
        future.wait(timeout)
      end
    end
  rescue NATS::Timeout => e
    synchronize { @resp_map.delete(token) }
    raise e
  end

  # Check if there is a response already.
  synchronize do
    result = @resp_map[token]
    response = result[:response]
    @resp_map.delete(token)
  end

  if response&.header
    status = response.header[STATUS_HDR]
    raise NATS::IO::NoRespondersError if status == "503"
  end

  response
end

#request_msg(msg, **opts) ⇒ Object

request_msg makes a NATS request using a NATS::Msg that may include headers.

Raises:

  • (TypeError)


620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
# File 'lib/nats/io/client.rb', line 620

def request_msg(msg, **opts)
  raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg)
  raise NATS::IO::BadSubject if !msg.subject || msg.subject.empty?

  token = nil
  inbox = nil
  future = nil
  response = nil
  timeout = opts[:timeout] ||= 0.5
  synchronize do
    start_resp_mux_sub! unless @resp_sub_prefix

    # Create token for this request.
    token = @nuid.next
    inbox = "#{@resp_sub_prefix}.#{token}"

    # Create the a future for the request that will
    # get signaled when it receives the request.
    future = @resp_sub.new_cond
    @resp_map[token][:future] = future
  end
  msg.reply = inbox
  msg.data ||= ""
  msg.data.bytesize

  # Publish request and wait for reply.
  publish_msg(msg)
  begin
    MonotonicTime.with_nats_timeout(timeout) do
      @resp_sub.synchronize do
        future.wait(timeout)
      end
    end
  rescue NATS::Timeout => e
    synchronize { @resp_map.delete(token) }
    raise e
  end

  # Check if there is a response already.
  synchronize do
    result = @resp_map[token]
    response = result[:response]
    @resp_map.delete(token)
  end

  if response&.header
    status = response.header[STATUS_HDR]
    raise NATS::IO::NoRespondersError if status == "503"
  end

  response
end

#servicesObject



862
863
864
# File 'lib/nats/io/client.rb', line 862

def services
  synchronize { @_services ||= Services.new(self) }
end

#subscribe(subject, opts = {}, &callback) ⇒ Object

Create subscription which is dispatched asynchronously messages to a callback.



519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
# File 'lib/nats/io/client.rb', line 519

def subscribe(subject, opts = {}, &callback)
  raise NATS::IO::ConnectionDrainingError.new("nats: connection draining") if draining?

  sid = nil
  sub = nil
  synchronize do
    sid = (@ssid += 1)
    sub = @subs[sid] = Subscription.new
    sub.nc = self
    sub.sid = sid
  end
  opts[:pending_msgs_limit] ||= NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT
  opts[:pending_bytes_limit] ||= NATS::IO::DEFAULT_SUB_PENDING_BYTES_LIMIT

  sub.subject = subject
  sub.callback = callback
  sub.received = 0
  sub.queue = opts[:queue] if opts[:queue]
  sub.max = opts[:max] if opts[:max]
  sub.pending_msgs_limit = opts[:pending_msgs_limit]
  sub.pending_bytes_limit = opts[:pending_bytes_limit]
  sub.pending_queue = SizedQueue.new(sub.pending_msgs_limit)
  sub.processing_concurrency = opts[:processing_concurrency] if opts.key?(:processing_concurrency)

  send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}")
  @flush_queue << :sub

  # Setup server support for auto-unsubscribe when receiving enough messages
  sub.unsubscribe(opts[:max]) if opts[:max]

  unless callback
    cond = sub.new_cond
    sub.wait_for_msgs_cond = cond
  end

  sub
end