Module: NATS

Defined in:
lib/nats/client.rb

Defined Under Namespace

Classes: ClientError, ConnectError, Error, ServerError

Constant Summary collapse

VERSION =
'0.4.28'.freeze
DEFAULT_PORT =
4222
DEFAULT_URI =
"nats://localhost:#{DEFAULT_PORT}".freeze
MAX_RECONNECT_ATTEMPTS =
10
RECONNECT_TIME_WAIT =
2
MAX_PENDING_SIZE =
32768
FAST_PRODUCER_THRESHOLD =

Maximum outbound size per client to trigger FP, 20MB

(10*1024*1024)
MSG =

Protocol

/\AMSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\r\n/i
OK =

:nodoc:

/\A\+OK\s*\r\n/i
ERR =

:nodoc:

/\A-ERR\s+('.+')?\r\n/i
PING =

:nodoc:

/\APING\s*\r\n/i
PONG =

:nodoc:

/\APONG\s*\r\n/i
INFO =

:nodoc:

/\AINFO\s+([^\r\n]+)\r\n/i
UNKNOWN =

:nodoc:

/\A(.*)\r\n/
CR_LF =

Responses

("\r\n".freeze)
CR_LF_SIZE =

:nodoc:

(CR_LF.bytesize)
PING_REQUEST =

:nodoc:

("PING#{CR_LF}".freeze)
PONG_RESPONSE =

:nodoc:

("PONG#{CR_LF}".freeze)
EMPTY_MSG =

:nodoc:

(''.freeze)
SUB =

Used for future pedantic Mode

/^([^\.\*>\s]+|>$|\*)(\.([^\.\*>\s]+|>$|\*))*$/
SUB_NO_WC =

:nodoc:

/^([^\.\*>\s]+)(\.([^\.\*>\s]+))*$/
AWAITING_CONTROL_LINE =

Parser

1
AWAITING_MSG_PAYLOAD =

:nodoc:

2
AUTOSTART_PID_FILE =

Autostart properties

'/tmp/nats-server.pid'
AUTOSTART_LOG_FILE =
'/tmp/nats-server.log'
@@tried_autostart =

Duplicate autostart protection

{}

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.clientObject (readonly)

:nodoc:


71
72
73
# File 'lib/nats/client.rb', line 71

def client
  @client
end

.err_cbObject (readonly)

:nodoc:


71
72
73
# File 'lib/nats/client.rb', line 71

def err_cb
  @err_cb
end

.err_cb_overriddenObject (readonly)

:nodoc:


71
72
73
# File 'lib/nats/client.rb', line 71

def err_cb_overridden
  @err_cb_overridden
end

.reactor_was_runningObject (readonly) Also known as: reactor_was_running?

:nodoc:


71
72
73
# File 'lib/nats/client.rb', line 71

def reactor_was_running
  @reactor_was_running
end

.reconnect_cbObject (readonly)

:nodoc


72
73
74
# File 'lib/nats/client.rb', line 72

def reconnect_cb
  @reconnect_cb
end

.timeout_cbObject

:nodoc


73
74
75
# File 'lib/nats/client.rb', line 73

def timeout_cb
  @timeout_cb
end

Instance Attribute Details

#bytes_receivedObject (readonly)

Returns the value of attribute bytes_received


276
277
278
# File 'lib/nats/client.rb', line 276

def bytes_received
  @bytes_received
end

#bytes_sentObject (readonly)

Returns the value of attribute bytes_sent


276
277
278
# File 'lib/nats/client.rb', line 276

def bytes_sent
  @bytes_sent
end

#closingObject (readonly) Also known as: closing?

:nodoc


275
276
277
# File 'lib/nats/client.rb', line 275

def closing
  @closing
end

#connect_cbObject (readonly)

:nodoc:


274
275
276
# File 'lib/nats/client.rb', line 274

def connect_cb
  @connect_cb
end

#connectedObject (readonly) Also known as: connected?

:nodoc:


274
275
276
# File 'lib/nats/client.rb', line 274

def connected
  @connected
end

#err_cbObject (readonly)

:nodoc:


274
275
276
# File 'lib/nats/client.rb', line 274

def err_cb
  @err_cb
end

#err_cb_overriddenObject (readonly)

:nodoc:


274
275
276
# File 'lib/nats/client.rb', line 274

def err_cb_overridden
  @err_cb_overridden
end

#msgs_receivedObject (readonly)

Returns the value of attribute msgs_received


276
277
278
# File 'lib/nats/client.rb', line 276

def msgs_received
  @msgs_received
end

#msgs_sentObject (readonly)

Returns the value of attribute msgs_sent


276
277
278
# File 'lib/nats/client.rb', line 276

def msgs_sent
  @msgs_sent
end

#optionsObject (readonly)

:nodoc


275
276
277
# File 'lib/nats/client.rb', line 275

def options
  @options
end

#pingsObject (readonly)

Returns the value of attribute pings


276
277
278
# File 'lib/nats/client.rb', line 276

def pings
  @pings
end

#reconnectingObject (readonly) Also known as: reconnecting?

:nodoc


275
276
277
# File 'lib/nats/client.rb', line 275

def reconnecting
  @reconnecting
end

#server_infoObject (readonly)

:nodoc


275
276
277
# File 'lib/nats/client.rb', line 275

def server_info
  @server_info
end

Class Method Details

.clear_clientObject

:nodoc:


243
244
245
# File 'lib/nats/client.rb', line 243

def clear_client # :nodoc:
  @client = nil
end

.connect(opts = {}, &blk) ⇒ NATS

Create and return a connection to the server with the given options. The server will be autostarted if requested and the uri is determined to be local. The optional block will be called when the connection has been completed.

Parameters:

  • opts (Hash) (defaults to: {})
  • &blk (Block)

    called when the connection is completed. Connection will be passed to the block.

Options Hash (opts):

  • :uri (String|URI)

    The URI to connect to, example nats://localhost:4222

  • :autostart (Boolean)

    Boolean that can be used to engage server autostart functionality.

  • :reconnect (Boolean)

    Boolean that can be used to suppress reconnect functionality.

  • :debug (Boolean)

    Boolean that can be used to output additional debug information.

  • :verbose (Boolean)

    Boolean that is sent to server for setting verbose protocol mode.

  • :pedantic (Boolean)

    Boolean that is sent to server for setting pedantic mode.

  • :ssl (Boolean)

    Boolean that is sent to server for setting TLS/SSL mode.

  • :max_reconnect_attempts (Integer)

    Integer that can be used to set the max number of reconnect tries

  • :reconnect_time_wait (Integer)

    Integer that can be used to set the number of seconds to wait between reconnect tries

Returns:

  • (NATS)

    connection to the server.


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/nats/client.rb', line 93

def connect(opts={}, &blk)
  # Defaults
  opts[:verbose] = false if opts[:verbose].nil?
  opts[:pedantic] = false if opts[:pedantic].nil?
  opts[:reconnect] = true if opts[:reconnect].nil?
  opts[:ssl] = false if opts[:ssl].nil?
  opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil?
  opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil?

  # Override with ENV
  opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI
  opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil?
  opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil?
  opts[:debug] = ENV['NATS_DEBUG'].downcase == 'true' unless ENV['NATS_DEBUG'].nil?
  opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil?
  opts[:fast_producer_error] = ENV['NATS_FAST_PRODUCER'].downcase == 'true' unless ENV['NATS_FAST_PRODUCER'].nil?
  opts[:ssl] = ENV['NATS_SSL'].downcase == 'true' unless ENV['NATS_SSL'].nil?
  opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil?
  opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil?
  @uri = opts[:uri] = opts[:uri].is_a?(URI) ? opts[:uri] : URI.parse(opts[:uri])
  @err_cb = proc { |e| raise e } unless err_cb
  check_autostart(@uri) if opts[:autostart] == true

  client = EM.connect(@uri.host, @uri.port, self, opts)
  client.on_connect(&blk) if blk
  return client
end

.connected?Boolean

Returns Connected state

Returns:

  • (Boolean)

    Connected state


143
144
145
146
# File 'lib/nats/client.rb', line 143

def connected?
  return false unless client
  client.connected?
end

.create_inboxString

Returns a subject that can be used for “directed” communications.

Returns:


210
211
212
# File 'lib/nats/client.rb', line 210

def create_inbox
  "_INBOX.#{SecureRandom.hex(13)}"
end

.flush(*args, &blk) ⇒ Object

Flushes all messages and subscriptions in the default connection

See Also:


216
217
218
# File 'lib/nats/client.rb', line 216

def flush(*args, &blk)
  (@client ||= connect).flush(*args, &blk)
end

.on_error(&callback) ⇒ Object

Set the default on_error callback.

Parameters:

  • &callback (Block)

    called when an error has been detected.


168
169
170
# File 'lib/nats/client.rb', line 168

def on_error(&callback)
  @err_cb, @err_cb_overridden = callback, true
end

.on_reconnect(&callback) ⇒ Object

Set the default on_reconnect callback.

Parameters:

  • &callback (Block)

    called when a reconnect attempt is made.


174
175
176
# File 'lib/nats/client.rb', line 174

def on_reconnect(&callback)
  @reconnect_cb = callback
end

.optionsHash

Returns Options

Returns:

  • (Hash)

    Options


155
156
157
158
# File 'lib/nats/client.rb', line 155

def options
  return {} unless client
  client.options
end

.pending_data_size(*args) ⇒ Object

Return bytes outstanding for the default client connection.

See Also:


222
223
224
# File 'lib/nats/client.rb', line 222

def pending_data_size(*args)
  (@client ||= connect).pending_data_size(*args)
end

.publish(*args, &blk) ⇒ Object

Publish a message using the default client connection.

See Also:


180
181
182
# File 'lib/nats/client.rb', line 180

def publish(*args, &blk)
  (@client ||= connect).publish(*args, &blk)
end

.reconnecting?Boolean

Returns Reconnecting state

Returns:

  • (Boolean)

    Reconnecting state


149
150
151
152
# File 'lib/nats/client.rb', line 149

def reconnecting?
  return false unless client
  client.reconnecting?
end

.request(*args, &blk) ⇒ Object

Publish a message and wait for a response on the default client connection.

See Also:


204
205
206
# File 'lib/nats/client.rb', line 204

def request(*args, &blk)
  (@client ||= connect).request(*args, &blk)
end

.server_infoHash

Returns Server information

Returns:

  • (Hash)

    Server information


161
162
163
164
# File 'lib/nats/client.rb', line 161

def server_info
  return nil unless client
  client.server_info
end

.server_running?(uri) ⇒ Boolean

:nodoc:

Returns:

  • (Boolean)

234
235
236
237
238
239
240
241
# File 'lib/nats/client.rb', line 234

def server_running?(uri) # :nodoc:
  require 'socket'
  s = TCPSocket.new(uri.host, uri.port)
  s.close
  return true
rescue
  return false
end

.start(*args, &blk) ⇒ Object

Create a default client connection to the server.

See Also:

  • connect

123
124
125
126
127
128
129
130
131
# File 'lib/nats/client.rb', line 123

def start(*args, &blk)
  @reactor_was_running = EM.reactor_running?
  unless (@reactor_was_running || blk)
    raise(Error, "EM needs to be running when NATS.start called without a run block")
  end
  # Setup optimized select versions
  EM.epoll; EM.kqueue
  EM.run { @client = connect(*args, &blk) }
end

.stop(&blk) ⇒ Object

Close the default client connection and optionally call the associated block.

Parameters:

  • &blk (Block)

    called when the connection is closed.


135
136
137
138
139
140
# File 'lib/nats/client.rb', line 135

def stop(&blk)
  client.close if (client and (client.connected? || client.reconnecting?))
  blk.call if blk
  @@tried_autostart = {}
  @err_cb = nil
end

.subscribe(*args, &blk) ⇒ Object

Subscribe using the default client connection.

See Also:


186
187
188
# File 'lib/nats/client.rb', line 186

def subscribe(*args, &blk)
  (@client ||= connect).subscribe(*args, &blk)
end

.timeout(*args, &blk) ⇒ Object

Set a timeout for receiving messages for the subscription.

See Also:


198
199
200
# File 'lib/nats/client.rb', line 198

def timeout(*args, &blk)
  (@client ||= connect).timeout(*args, &blk)
end

.unsubscribe(*args) ⇒ Object

Cancel a subscription on the default client connection.

See Also:


192
193
194
# File 'lib/nats/client.rb', line 192

def unsubscribe(*args)
  (@client ||= connect).unsubscribe(*args)
end

.wait_for_server(uri, max_wait = 5) ⇒ Object

:nodoc:


226
227
228
229
230
231
232
# File 'lib/nats/client.rb', line 226

def wait_for_server(uri, max_wait = 5) # :nodoc:
  start = Time.now
  while (Time.now - start < max_wait) # Wait max_wait seconds max
    break if server_running?(uri)
    sleep(0.1)
  end
end

Instance Method Details

#attempt_reconnectObject

:nodoc:


607
608
609
610
611
612
613
614
# File 'lib/nats/client.rb', line 607

def attempt_reconnect #:nodoc:
  process_disconnect and return if (@reconnect_attempts += 1) > @options[:max_reconnect_attempts]
  begin
    EM.reconnect(@uri.host, @uri.port, self)
  rescue
  end
  @reconnect_cb.call unless @reconnect_cb.nil?
end

#closeObject

Close the connection to the server.


418
419
420
421
422
423
# File 'lib/nats/client.rb', line 418

def close
  @closing = true
  EM.cancel_timer(@reconnect_timer) if @reconnect_timer
  close_connection_after_writing if connected?
  process_disconnect if reconnecting?
end

#connection_completedObject

:nodoc:


555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
# File 'lib/nats/client.rb', line 555

def connection_completed #:nodoc:
  @connected = true unless @ssl
  if reconnecting?
    EM.cancel_timer(@reconnect_timer)
    send_connect_command
    @subs.each_pair { |k, v| send_command("SUB #{v[:subject]} #{v[:queue]} #{k}#{CR_LF}") }
  end
  flush_pending unless @ssl
  unless user_err_cb? or reconnecting?
    @err_cb = proc { |e| raise e }
  end
  if (connect_cb and not reconnecting?)
    # We will round trip the server here to make sure all state from any pending commands
    # has been processed before calling the connect callback.
    queue_server_rt { connect_cb.call(self) }
  end
  @reconnecting = false
  @parse_state = AWAITING_CONTROL_LINE
end

#disconnect_error_stringObject


590
591
592
593
# File 'lib/nats/client.rb', line 590

def disconnect_error_string
  return "Client disconnected from server on #{@uri}." if @connected
  return "Could not connect to server on #{@uri}"
end

#flush(&blk) ⇒ Object

Flushes all messages and subscriptions for the connection. All messages and subscriptions have been processed by the server when the optional callback is called.


395
396
397
# File 'lib/nats/client.rb', line 395

def flush(&blk)
  queue_server_rt(&blk) if blk
end

#flush_pendingObject

:nodoc:


483
484
485
486
487
# File 'lib/nats/client.rb', line 483

def flush_pending #:nodoc:
  return unless @pending
  send_data(@pending.join)
  @pending, @pending_size = nil, 0
end

#initialize(options) ⇒ Object


282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/nats/client.rb', line 282

def initialize(options)
  @uri = options[:uri]
  @uri.user = options[:user] if options[:user]
  @uri.password = options[:pass] if options[:pass]
  @ssl = options[:ssl] if options[:ssl]
  @options = options
  @ssid, @subs = 1, {}
  @err_cb = NATS.err_cb
  @reconnect_timer, @needed = nil, nil
  @reconnect_cb = NATS.reconnect_cb
  @connected, @closing, @reconnecting = false, false, false
  @msgs_received = @msgs_sent = @bytes_received = @bytes_sent = @pings = 0
  @pending_size = 0
  send_connect_command
end

#inspectObject

:nodoc:


629
630
631
# File 'lib/nats/client.rb', line 629

def inspect #:nodoc:
  "<nats client v#{NATS::VERSION}>"
end

#on_connect(&callback) ⇒ Object

Define a callback to be called when the client connection has been established.

Parameters:

  • callback (Block)

401
402
403
# File 'lib/nats/client.rb', line 401

def on_connect(&callback)
  @connect_cb = callback
end

#on_error(&callback) ⇒ Object

Define a callback to be called when errors occur on the client connection.

Parameters:

  • &blk (Block)

    called when the connection is closed.


407
408
409
# File 'lib/nats/client.rb', line 407

def on_error(&callback)
  @err_cb, @err_cb_overridden = callback, true
end

#on_msg(subject, sid, reply, msg) ⇒ Object

:nodoc:


450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# File 'lib/nats/client.rb', line 450

def on_msg(subject, sid, reply, msg) #:nodoc:

  # Accounting - We should account for inbound even if they are not processed.
  @msgs_received += 1
  @bytes_received += msg.bytesize if msg

  return unless sub = @subs[sid]

  # Check for auto_unsubscribe
  sub[:received] += 1
  if sub[:max]
    # Client side support in case server did not receive unsubscribe
    return unsubscribe(sid) if (sub[:received] > sub[:max])
    # cleanup here if we have hit the max..
    @subs.delete(sid) if (sub[:received] == sub[:max])
  end

  if cb = sub[:callback]
    case cb.arity
      when 0 then cb.call
      when 1 then cb.call(msg)
      when 2 then cb.call(msg, reply)
      else cb.call(msg, reply, subject)
    end
  end

  # Check for a timeout, and cancel if received >= expected
  if (sub[:timeout] && sub[:received] >= sub[:expected])
    EM.cancel_timer(sub[:timeout])
    sub[:timeout] = nil
  end
end

#on_reconnect(&callback) ⇒ Object

Define a callback to be called when a reconnect attempt is made.

Parameters:

  • &blk (Block)

    called when the connection is closed.


413
414
415
# File 'lib/nats/client.rb', line 413

def on_reconnect(&callback)
  @reconnect_cb = callback
end

#pending_data_sizeObject

Return bytes outstanding waiting to be sent to server.


426
427
428
# File 'lib/nats/client.rb', line 426

def pending_data_size
  get_outbound_data_size + @pending_size
end

#process_disconnectObject

:nodoc:


595
596
597
598
599
600
601
602
603
604
605
# File 'lib/nats/client.rb', line 595

def process_disconnect #:nodoc:
  err_cb.call(NATS::ConnectError.new(disconnect_error_string)) if not closing? and @err_cb
ensure
  EM.cancel_timer(@reconnect_timer) if @reconnect_timer
  if (NATS.client == self)
    NATS.clear_client
    EM.stop if ((connected? || reconnecting?) and closing? and not NATS.reactor_was_running?)
  end
  @connected = @reconnecting = false
  true # Chaining
end

#process_info(info) ⇒ Object

:nodoc:


536
537
538
539
540
541
542
543
544
545
546
547
548
# File 'lib/nats/client.rb', line 536

def process_info(info) #:nodoc:
  @server_info = JSON.parse(info, :symbolize_keys => true, :symbolize_names => true)
  if @server_info[:ssl_required] && @ssl
    start_tls
  else
    if @server_info[:ssl_required]
      err_cb.call(NATS::ClientError.new('TLS/SSL required by server'))
    elsif @ssl
      err_cb.call(NATS::ClientError.new('TLS/SSL not supported by server'))
    end
  end
  @server_info
end

#publish(subject, msg = EMPTY_MSG, opt_reply = nil, &blk) ⇒ Object

Publish a message to a given subject, with optional reply subject and completion block

Parameters:

  • subject (String)
  • msg (Object, #to_s) (defaults to: EMPTY_MSG)
  • opt_reply (String) (defaults to: nil)
  • blk, (Block)

    closure called when publish has been processed by the server.


303
304
305
306
307
308
309
310
311
312
313
# File 'lib/nats/client.rb', line 303

def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk)
  return unless subject
  msg = msg.to_s

  # Accounting
  @msgs_sent += 1
  @bytes_sent += msg.bytesize if msg

  send_command("PUB #{subject} #{opt_reply} #{msg.bytesize}#{CR_LF}#{msg}#{CR_LF}")
  queue_server_rt(&blk) if blk
end

#queue_server_rt(&cb) ⇒ Object

:nodoc:


444
445
446
447
448
# File 'lib/nats/client.rb', line 444

def queue_server_rt(&cb) #:nodoc:
  return unless cb
  (@pongs ||= []) << cb
  send_command(PING_REQUEST)
end

#receive_data(data) ⇒ Object

:nodoc:


489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
# File 'lib/nats/client.rb', line 489

def receive_data(data) #:nodoc:
  @buf = @buf ? @buf << data : data
  while (@buf)
    case @parse_state
    when AWAITING_CONTROL_LINE
      case @buf
      when MSG
        @buf = $'
        @sub, @sid, @reply, @needed = $1, $2.to_i, $4, $5.to_i
        @parse_state = AWAITING_MSG_PAYLOAD
      when OK # No-op right now
        @buf = $'
      when ERR
        @buf = $'
        err_cb.call(NATS::ServerError.new($1))
      when PING
        @pings += 1
        @buf = $'
        send_command(PONG_RESPONSE)
      when PONG
        @buf = $'
        cb = @pongs.shift
        cb.call if cb
      when INFO
        @buf = $'
        process_info($1)
      when UNKNOWN
        @buf = $'
        err_cb.call(NATS::ServerError.new("Unknown protocol: #{$1}"))
      else
        # If we are here we do not have a complete line yet that we understand.
        return
      end
      @buf = nil if (@buf && @buf.empty?)

    when AWAITING_MSG_PAYLOAD
      return unless (@needed && @buf.bytesize >= (@needed + CR_LF_SIZE))
      on_msg(@sub, @sid, @reply, @buf.slice(0, @needed))
      @buf = @buf.slice((@needed + CR_LF_SIZE), @buf.bytesize)
      @sub = @sid = @reply = @needed = nil
      @parse_state = AWAITING_CONTROL_LINE
      @buf = nil if (@buf && @buf.empty?)
    end

  end
end

#request(subject, data = nil, opts = {}, &cb) ⇒ Object

Send a request and have the response delivered to the supplied callback.

Parameters:

  • subject (String)
  • msg (Object)
  • callback (Block)

Returns:

  • (Object)

    sid


378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/nats/client.rb', line 378

def request(subject, data=nil, opts={}, &cb)
  return unless subject
  inbox = NATS.create_inbox
  s = subscribe(inbox, opts) { |msg, reply|
    case cb.arity
      when 0 then cb.call
      when 1 then cb.call(msg)
      else cb.call(msg, reply)
    end
  }
  publish(subject, data, inbox)
  return s
end

#schedule_reconnect(wait = RECONNECT_TIME_WAIT) ⇒ Object

:nodoc:


575
576
577
578
579
580
# File 'lib/nats/client.rb', line 575

def schedule_reconnect(wait=RECONNECT_TIME_WAIT) #:nodoc:
  @reconnecting = true
  @reconnect_attempts = 0
  @connected = false
  @reconnect_timer = EM.add_periodic_timer(wait) { attempt_reconnect }
end

#send_command(command, priority = false) ⇒ Object

:nodoc:


616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/nats/client.rb', line 616

def send_command(command, priority = false) #:nodoc:
  EM.next_tick { flush_pending } if (connected? && @pending.nil?)
  @pending ||= []
  @pending << command unless priority
  @pending.unshift(command) if priority
  @pending_size += command.bytesize
  flush_pending if (connected? && @pending_size > MAX_PENDING_SIZE)
  if (@options[:fast_producer_error] && pending_data_size > FAST_PRODUCER_THRESHOLD)
    err_cb.call(NATS::ClientError.new("Fast Producer: #{pending_data_size} bytes outstanding"))
  end
  true
end

#send_connect_commandObject

:nodoc:


434
435
436
437
438
439
440
441
442
# File 'lib/nats/client.rb', line 434

def send_connect_command #:nodoc:
  cs = { :verbose => @options[:verbose], :pedantic => @options[:pedantic] }
  if @uri.user
    cs[:user] = @uri.user
    cs[:pass] = @uri.password
  end
  cs[:ssl_required] = @ssl if @ssl
  send_command("CONNECT #{cs.to_json}#{CR_LF}", true)
end

#ssl_handshake_completedObject


550
551
552
553
# File 'lib/nats/client.rb', line 550

def ssl_handshake_completed
  @connected = true
  flush_pending
end

#subscribe(subject, opts = {}, &callback) ⇒ Object

Subscribe to a subject with optional wildcards. Messages will be delivered to the supplied callback. Callback can take any number of the supplied arguments as defined by the list: msg, reply, sub. Returns subscription id which can be passed to #unsubscribe.

Parameters:

  • subject, (String)

    optionally with wilcards.

  • opts, (Hash)

    optional options hash, e.g. :queue, :max.

  • callback, (Block)

    called when a message is delivered.

Returns:

  • (Object)

    sid, Subject Identifier


323
324
325
326
327
328
329
330
331
332
333
# File 'lib/nats/client.rb', line 323

def subscribe(subject, opts={}, &callback)
  return unless subject
  sid = (@ssid += 1)
  sub = @subs[sid] = { :subject => subject, :callback => callback, :received => 0 }
  sub[:queue] = opts[:queue] if opts[:queue]
  sub[:max] = opts[:max] if opts[:max]
  send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}")
  # Setup server support for auto-unsubscribe
  unsubscribe(sid, opts[:max]) if opts[:max]
  sid
end

#subscription_countNumber

Return the active subscription count.

Returns:

  • (Number)

348
349
350
# File 'lib/nats/client.rb', line 348

def subscription_count
  @subs.size
end

#timeout(sid, timeout, opts = {}, &callback) ⇒ Object

Setup a timeout for receiving messages for the subscription.

Parameters:

  • sid (Object)
  • timeout, (Number)

    float in seconds

  • opts, (Hash)

    options, :auto_unsubscribe(true), :expected(1)


356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/nats/client.rb', line 356

def timeout(sid, timeout, opts={}, &callback)
  # Setup a timeout if requested
  return unless sub = @subs[sid]

  auto_unsubscribe, expected = true, 1
  auto_unsubscribe = opts[:auto_unsubscribe] if opts.key?(:auto_unsubscribe)
  expected = opts[:expected] if opts.key?(:expected)

  EM.cancel_timer(sub[:timeout]) if sub[:timeout]

  sub[:timeout] = EM.add_timer(timeout) do
    unsubscribe(sid) if auto_unsubscribe
    callback.call(sid) if callback
  end
  sub[:expected] = expected
end

#unbindObject

:nodoc:


582
583
584
585
586
587
588
# File 'lib/nats/client.rb', line 582

def unbind #:nodoc:
  if connected? and not closing? and not reconnecting? and @options[:reconnect]
    schedule_reconnect(@options[:reconnect_time_wait])
  else
    process_disconnect unless reconnecting?
  end
end

#unsubscribe(sid, opt_max = nil) ⇒ Object

Cancel a subscription.

Parameters:

  • sid (Object)
  • opt_max, (Number)

    optional number of responses to receive before auto-unsubscribing


338
339
340
341
342
343
344
# File 'lib/nats/client.rb', line 338

def unsubscribe(sid, opt_max=nil)
  opt_max_str = " #{opt_max}" unless opt_max.nil?
  send_command("UNSUB #{sid}#{opt_max_str}#{CR_LF}")
  return unless sub = @subs[sid]
  sub[:max] = opt_max
  @subs.delete(sid) unless (sub[:max] && (sub[:received] < sub[:max]))
end

#user_err_cb?Boolean

:nodoc:

Returns:

  • (Boolean)

430
431
432
# File 'lib/nats/client.rb', line 430

def user_err_cb? # :nodoc:
  err_cb_overridden || NATS.err_cb_overridden
end