Class: NATS::KeyWatcher
- Inherits:
-
Object
- Object
- NATS::KeyWatcher
- Includes:
- Enumerable, MonitorMixin
- Defined in:
- lib/nats/io/kv.rb
Instance Attribute Summary collapse
-
#_active ⇒ Object
Returns the value of attribute _active.
-
#_dseq ⇒ Object
Returns the value of attribute _dseq.
-
#_hb_task ⇒ Object
Returns the value of attribute _hb_task.
-
#_init_done ⇒ Object
Returns the value of attribute _init_done.
-
#_sseq ⇒ Object
Returns the value of attribute _sseq.
-
#_sub ⇒ Object
Returns the value of attribute _sub.
-
#_updates ⇒ Object
Returns the value of attribute _updates.
-
#_watcher_cond ⇒ Object
Returns the value of attribute _watcher_cond.
-
#pending ⇒ Object
Returns the value of attribute pending.
-
#received ⇒ Object
Returns the value of attribute received.
Instance Method Summary collapse
-
#each ⇒ Object
Implements Enumerable.
-
#initialize(js) ⇒ KeyWatcher
constructor
A new instance of KeyWatcher.
- #stop ⇒ Object
- #take(n) ⇒ Object
- #updates(params = {}) ⇒ Object
Constructor Details
#initialize(js) ⇒ KeyWatcher
Returns a new instance of KeyWatcher.
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 |
# File 'lib/nats/io/kv.rb', line 471 def initialize(js) super() # required to initialize monitor @js = js @_sub = nil @_updates = SizedQueue.new(256) @_init_done = false @pending = nil # Ordered consumer related @_dseq = 1 @_sseq = 0 @_cmeta = nil @_fcr = 0 @_fciseq = 0 @_active = true @_hb_task = nil end |
Instance Attribute Details
#_active ⇒ Object
Returns the value of attribute _active.
469 470 471 |
# File 'lib/nats/io/kv.rb', line 469 def _active @_active end |
#_dseq ⇒ Object
Returns the value of attribute _dseq.
469 470 471 |
# File 'lib/nats/io/kv.rb', line 469 def _dseq @_dseq end |
#_hb_task ⇒ Object
Returns the value of attribute _hb_task.
469 470 471 |
# File 'lib/nats/io/kv.rb', line 469 def _hb_task @_hb_task end |
#_init_done ⇒ Object
Returns the value of attribute _init_done.
468 469 470 |
# File 'lib/nats/io/kv.rb', line 468 def _init_done @_init_done end |
#_sseq ⇒ Object
Returns the value of attribute _sseq.
469 470 471 |
# File 'lib/nats/io/kv.rb', line 469 def _sseq @_sseq end |
#_sub ⇒ Object
Returns the value of attribute _sub.
468 469 470 |
# File 'lib/nats/io/kv.rb', line 468 def _sub @_sub end |
#_updates ⇒ Object
Returns the value of attribute _updates.
468 469 470 |
# File 'lib/nats/io/kv.rb', line 468 def _updates @_updates end |
#_watcher_cond ⇒ Object
Returns the value of attribute _watcher_cond.
468 469 470 |
# File 'lib/nats/io/kv.rb', line 468 def _watcher_cond @_watcher_cond end |
#pending ⇒ Object
Returns the value of attribute pending.
468 469 470 |
# File 'lib/nats/io/kv.rb', line 468 def pending @pending end |
#received ⇒ Object
Returns the value of attribute received.
468 469 470 |
# File 'lib/nats/io/kv.rb', line 468 def received @received end |
Instance Method Details
#each ⇒ Object
Implements Enumerable.
504 505 506 507 508 509 |
# File 'lib/nats/io/kv.rb', line 504 def each loop do result = @_updates.pop yield result end end |
#stop ⇒ Object
488 489 490 491 |
# File 'lib/nats/io/kv.rb', line 488 def stop @_hb_task.shutdown @_sub.unsubscribe end |
#take(n) ⇒ Object
511 512 513 |
# File 'lib/nats/io/kv.rb', line 511 def take(n) super.take(n).reject { |entry| entry.nil? } end |
#updates(params = {}) ⇒ Object
493 494 495 496 497 498 499 500 501 |
# File 'lib/nats/io/kv.rb', line 493 def updates(params = {}) params[:timeout] ||= 5 result = nil MonotonicTime.with_nats_timeout(params[:timeout]) do result = @_updates.pop(timeout: params[:timeout]) end result end |