Class: NATS::IO::Client

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/nats/io/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeClient

Returns a new instance of Client.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/nats/io/client.rb', line 86

def initialize
  super # required to initialize monitor
  @options = nil

  # Read/Write IO
  @io = nil

  # Queues for coalescing writes of commands we need to send to server.
  @flush_queue = nil
  @pending_queue = nil

  # Parser with state
  @parser = NATS::Protocol::Parser.new(self)

  # Threads for both reading and flushing command
  @flusher_thread = nil
  @read_loop_thread = nil
  @ping_interval_thread = nil

  # Info that we get from the server
  @server_info = { }

  # URI from server to which we are currently connected
  @uri = nil
  @server_pool = []

  @status = DISCONNECTED

  # Subscriptions
  @subs = { }
  @ssid = 0

  # Ping interval
  @pings_outstanding = 0
  @pongs_received = 0
  @pongs = []
  @pongs.extend(MonitorMixin)

  # Accounting
  @pending_size = 0
  @stats = {
    in_msgs: 0,
    out_msgs: 0,
    in_bytes: 0,
    out_bytes: 0,
    reconnects: 0
  }

  # Sticky error
  @last_err = nil

  # Async callbacks
  @err_cb = proc { |e| raise e }
  @close_cb = proc { }
  @disconnect_cb = proc { }
  @reconnect_cb = proc { }

  # Secure TLS options
  @tls = nil
end

Instance Attribute Details

#connected_serverObject (readonly)

Returns the value of attribute connected_server.



84
85
86
# File 'lib/nats/io/client.rb', line 84

def connected_server
  @connected_server
end

#optionsObject (readonly)

Returns the value of attribute options.



84
85
86
# File 'lib/nats/io/client.rb', line 84

def options
  @options
end

#server_infoObject (readonly)

Returns the value of attribute server_info.



84
85
86
# File 'lib/nats/io/client.rb', line 84

def server_info
  @server_info
end

#server_poolObject (readonly) Also known as: servers

Returns the value of attribute server_pool.



84
85
86
# File 'lib/nats/io/client.rb', line 84

def server_pool
  @server_pool
end

#statsObject (readonly)

Returns the value of attribute stats.



84
85
86
# File 'lib/nats/io/client.rb', line 84

def stats
  @stats
end

#statusObject (readonly)

Returns the value of attribute status.



84
85
86
# File 'lib/nats/io/client.rb', line 84

def status
  @status
end

#uriObject (readonly)

Returns the value of attribute uri.



84
85
86
# File 'lib/nats/io/client.rb', line 84

def uri
  @uri
end

Instance Method Details

#closeObject

Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
# File 'lib/nats/io/client.rb', line 453

def close
  synchronize do
    return if @status == CLOSED
    @status = CLOSED
  end

  # Kick the flusher so it bails due to closed state
  @flush_queue << :fallout
  Thread.pass

  # FIXME: More graceful way of handling the following?
  # Ensure ping interval and flusher are not running anymore
  @ping_interval_thread.exit if @ping_interval_thread.alive?
  @flusher_thread.exit if @flusher_thread.alive?
  @read_loop_thread.exit if @read_loop_thread.alive?

  # TODO: Delete any other state which we are not using here too.
  synchronize do
    @pongs.synchronize do
      @pongs.each do |pong|
        pong.signal
      end
      @pongs.clear
    end

    # Try to write any pending flushes in case
    # we have a connection then close it.
    begin
      cmds = []
      cmds << @pending_queue.pop until @pending_queue.empty?

      # FIXME: Fails when empty on TLS connection?
      @io.write(cmds.join) unless cmds.empty?
    rescue => e
      @last_err = e
      @err_cb.call(e) if @err_cb
    end if @io and not @io.closed?

    # TODO: Destroy any remaining subscriptions
    @disconnect_cb.call if @disconnect_cb
    @close_cb.call if @close_cb

    # Close the established connection in case
    # we still have it.
    if @io
      @io.close
      @io = nil
    end
  end
end

#closed?Boolean

Returns:

  • (Boolean)


524
525
526
# File 'lib/nats/io/client.rb', line 524

def closed?
  @status == CLOSED
end

#connect(opts = {}) ⇒ Object

Establishes connection to NATS



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/nats/io/client.rb', line 148

def connect(opts={})
  opts[:verbose] = false if opts[:verbose].nil?
  opts[:pedantic] = false if opts[:pedantic].nil?
  opts[:reconnect] = true if opts[:reconnect].nil?
  opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil?
  opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil?
  opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil?
  opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil?

  # Override with ENV
  opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil?
  opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil?
  opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil?
  opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil?
  opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil?
  opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil?
  opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil?
  @options = opts

  # Process servers in the NATS cluster and pick one to connect
  uris = opts[:servers] || [DEFAULT_URI]
  uris.shuffle! unless @options[:dont_randomize_servers]
  uris.each do |u|
    @server_pool << { :uri => u.is_a?(URI) ? u.dup : URI.parse(u) }
  end

  # Check for TLS usage
  @tls = @options[:tls]

  begin
    current = select_next_server

    # Create TCP socket connection to NATS
    @io = create_socket
    @io.connect

    # Capture state that we have had a TCP connection established against
    # this server and could potentially be used for reconnecting.
    current[:was_connected] = true

    # Connection established and now in process of sending CONNECT to NATS
    @status = CONNECTING

    # Established TCP connection successfully so can start connect
    process_connect_init

    # Reset reconnection attempts if connection is valid
    current[:reconnect_attempts] = 0
  rescue NoServersError => e
    @disconnect_cb.call(e) if @disconnect_cb
    raise e
  rescue => e
    # Capture sticky error
    synchronize { @last_err = e }

    @err_cb.call(e) if @err_cb

    if should_not_reconnect?
      @disconnect_cb.call(e) if @disconnect_cb
      raise e
    end

    # always sleep here to safe guard against errors before current[:was_connected]
    # is set for the first time
    sleep @options[:reconnect_time_wait] if @options[:reconnect_time_wait]

    # Continue retrying until there are no options left in the server pool
    retry
  end

  # Initialize queues and loops for message dispatching and processing engine
  @flush_queue = SizedQueue.new(MAX_FLUSH_KICK_SIZE)
  @pending_queue = SizedQueue.new(MAX_PENDING_SIZE)
  @pings_outstanding = 0
  @pongs_received = 0
  @pending_size = 0

  # Server roundtrip went ok so consider to be connected at this point
  @status = CONNECTED

  # Connected to NATS so Ready to start parser loop, flusher and ping interval
  start_threads!
end

#connected?Boolean

Returns:

  • (Boolean)


512
513
514
# File 'lib/nats/io/client.rb', line 512

def connected?
  @status == CONNECTED
end

#connecting?Boolean

Returns:

  • (Boolean)


516
517
518
# File 'lib/nats/io/client.rb', line 516

def connecting?
  @status == CONNECTING
end

#discovered_serversObject



354
355
356
# File 'lib/nats/io/client.rb', line 354

def discovered_servers
  servers.select {|s| s[:discovered] }
end

#flush(timeout = 60) ⇒ Object

Send a ping and wait for a pong back within a timeout.



336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/nats/io/client.rb', line 336

def flush(timeout=60)
  # Schedule sending a PING, and block until we receive PONG back,
  # or raise a timeout in case the response is past the deadline.
  pong = @pongs.new_cond
  @pongs.synchronize do
    @pongs << pong

    # Flush once pong future has been prepared
    @pending_queue << PING_REQUEST
    @flush_queue << :ping
    with_nats_timeout(timeout) do
      pong.wait(timeout)
    end
  end
end

#last_errorObject



544
545
546
547
548
# File 'lib/nats/io/client.rb', line 544

def last_error
  synchronize do
    @last_err
  end
end

#new_inboxObject



504
505
506
# File 'lib/nats/io/client.rb', line 504

def new_inbox
  "_INBOX.#{SecureRandom.hex(13)}"
end

#on_close(&callback) ⇒ Object



540
541
542
# File 'lib/nats/io/client.rb', line 540

def on_close(&callback)
  @close_cb = callback
end

#on_disconnect(&callback) ⇒ Object



532
533
534
# File 'lib/nats/io/client.rb', line 532

def on_disconnect(&callback)
  @disconnect_cb = callback
end

#on_error(&callback) ⇒ Object



528
529
530
# File 'lib/nats/io/client.rb', line 528

def on_error(&callback)
  @err_cb = callback
end

#on_reconnect(&callback) ⇒ Object



536
537
538
# File 'lib/nats/io/client.rb', line 536

def on_reconnect(&callback)
  @reconnect_cb = callback
end

#process_err(err) ⇒ Object

Handles protocol errors being sent by the server.



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
# File 'lib/nats/io/client.rb', line 379

def process_err(err)
  # FIXME: In case of a stale connection, then handle as process_op_error

  # In case of permissions violation then dispatch the error callback
  # while holding the lock.
  current = server_pool.first
  current[:error_received] = true
  if current[:auth_required]
    @err_cb.call(NATS::IO::AuthError.new(err))
  else
    @err_cb.call(NATS::IO::ServerError.new(err))
  end

  # Otherwise, capture the error under a lock and close
  # the connection gracefully.
  synchronize do
    @last_err = NATS::IO::ServerError.new(err)
  end

  close
end

#process_msg(subject, sid, reply, data) ⇒ Object



401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
# File 'lib/nats/io/client.rb', line 401

def process_msg(subject, sid, reply, data)
  # Accounting
  @stats[:in_msgs] += 1
  @stats[:in_bytes] += data.size

  # Throw away in case we no longer manage the subscription
  sub = nil
  synchronize { sub = @subs[sid] }
  return unless sub

  # Check for auto_unsubscribe
  sub.synchronize do
    sub.received += 1
    if sub.max
      case
      when sub.received > sub.max
        # Client side support in case server did not receive unsubscribe
        unsubscribe(sid)
        return
      when sub.received == sub.max
        # Cleanup here if we have hit the max..
        @subs.delete(sid)
      end
    end

    # In case of a request which requires a future
    # do so here already while holding the lock and return
    if sub.future
      future = sub.future
      sub.response = Msg.new(subject, reply, data)
      future.signal

      return
    end
  end

  # Distinguish between async subscriptions with callbacks
  # and request subscriptions which expect a single response.
  if sub.callback
    cb = sub.callback
    case cb.arity
    when 0 then cb.call
    when 1 then cb.call(data)
    when 2 then cb.call(data, reply)
    else cb.call(data, reply, subject)
    end
  end
end

#process_pingObject

Received a ping so respond back with a pong



371
372
373
374
375
376
# File 'lib/nats/io/client.rb', line 371

def process_ping
  @pending_queue << PONG_RESPONSE
  @flush_queue << :ping
  pong = @pongs.new_cond
  @pongs.synchronize { @pongs << pong }
end

#process_pongObject

Methods only used by the parser



360
361
362
363
364
365
366
367
368
# File 'lib/nats/io/client.rb', line 360

def process_pong
  # Take first pong wait and signal any flush in case there was one
  @pongs.synchronize do
    pong = @pongs.pop
    pong.signal unless pong.nil?
  end
  @pings_outstanding -= 1
  @pongs_received += 1
end

#publish(subject, msg = EMPTY_MSG, opt_reply = nil, &blk) ⇒ Object

Raises:



232
233
234
235
236
237
238
239
240
241
242
# File 'lib/nats/io/client.rb', line 232

def publish(subject, msg=EMPTY_MSG, opt_reply=nil, &blk)
  raise BadSubject if !subject or subject.empty?
  msg_size = msg.bytesize

  # Accounting
  @stats[:out_msgs] += 1
  @stats[:out_bytes] += msg_size

  send_command("PUB #{subject} #{opt_reply} #{msg_size}\r\n#{msg}\r\n")
  @flush_queue << :pub if @flush_queue.empty?
end

#reconnecting?Boolean

Returns:

  • (Boolean)


520
521
522
# File 'lib/nats/io/client.rb', line 520

def reconnecting?
  @status == RECONNECTING
end

#request(subject, payload, opts = {}, &blk) ⇒ Object

Sends a request expecting a single response or raises a timeout in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/nats/io/client.rb', line 267

def request(subject, payload, opts={}, &blk)
  return unless subject
  inbox = new_inbox

  # If a callback was passed, then have it process
  # the messages asynchronously and return the sid.
  if blk
    opts[:max] ||= 1
    s = subscribe(inbox, opts) do |msg, reply|
      case blk.arity
      when 0 then blk.call
      when 1 then blk.call(msg)
      else blk.call(msg, reply)
      end
    end
    publish(subject, payload, inbox)

    return s
  end

  # In case block was not given, handle synchronously
  # with a timeout and only allow a single response.
  timeout = opts[:timeout] ||= 0.5
  opts[:max] = 1

  sub = Subscription.new
  sub.subject = inbox
  sub.received = 0
  future = sub.new_cond
  sub.future = future

  sid = nil
  synchronize do
    sid = (@ssid += 1)
    @subs[sid] = sub
  end

  send_command("SUB #{inbox} #{sid}#{CR_LF}")
  @flush_queue << :sub
  unsubscribe(sid, 1)

  sub.synchronize do
    # Publish the request and then wait for the response...
    publish(subject, payload, inbox)

    with_nats_timeout(timeout) do
      future.wait(timeout)
    end
  end
  response = sub.response

  response
end

#subscribe(subject, opts = {}, &callback) ⇒ Object

Create subscription which is dispatched asynchronously messages to a callback.



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/nats/io/client.rb', line 246

def subscribe(subject, opts={}, &callback)
  sid = (@ssid += 1)
  sub = @subs[sid] = Subscription.new
  sub.subject = subject
  sub.callback = callback
  sub.received = 0
  sub.queue = opts[:queue] if opts[:queue]
  sub.max = opts[:max] if opts[:max]

  send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}")
  @flush_queue << :sub

  # Setup server support for auto-unsubscribe when receiving enough messages
  unsubscribe(sid, opts[:max]) if opts[:max]

  sid
end

#unsubscribe(sid, opt_max = nil) ⇒ Object

Auto unsubscribes the server by sending UNSUB command and throws away subscription in case already present and has received enough messages.



323
324
325
326
327
328
329
330
331
332
333
# File 'lib/nats/io/client.rb', line 323

def unsubscribe(sid, opt_max=nil)
  opt_max_str = " #{opt_max}" unless opt_max.nil?
  send_command("UNSUB #{sid}#{opt_max_str}#{CR_LF}")
  @flush_queue << :unsub

  return unless sub = @subs[sid]
  synchronize do
    sub.max = opt_max
    @subs.delete(sid) unless (sub.max && (sub.received < sub.max))
  end
end