Class: NATS::KeyWatcher

Inherits:
Object
  • Object
show all
Includes:
Enumerable, MonitorMixin
Defined in:
lib/nats/io/kv.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(js) ⇒ KeyWatcher

Returns a new instance of KeyWatcher.



471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# File 'lib/nats/io/kv.rb', line 471

def initialize(js)
  super() # required to initialize monitor
  @js = js
  @_sub = nil
  @_updates = SizedQueue.new(256)
  @_init_done = false
  @pending = nil
  # Ordered consumer related
  @_dseq = 1
  @_sseq = 0
  @_cmeta = nil
  @_fcr = 0
  @_fciseq = 0
  @_active = true
  @_hb_task = nil
end

Instance Attribute Details

#_activeObject

Returns the value of attribute _active.



469
470
471
# File 'lib/nats/io/kv.rb', line 469

def _active
  @_active
end

#_dseqObject

Returns the value of attribute _dseq.



469
470
471
# File 'lib/nats/io/kv.rb', line 469

def _dseq
  @_dseq
end

#_hb_taskObject

Returns the value of attribute _hb_task.



469
470
471
# File 'lib/nats/io/kv.rb', line 469

def _hb_task
  @_hb_task
end

#_init_doneObject

Returns the value of attribute _init_done.



468
469
470
# File 'lib/nats/io/kv.rb', line 468

def _init_done
  @_init_done
end

#_sseqObject

Returns the value of attribute _sseq.



469
470
471
# File 'lib/nats/io/kv.rb', line 469

def _sseq
  @_sseq
end

#_subObject

Returns the value of attribute _sub.



468
469
470
# File 'lib/nats/io/kv.rb', line 468

def _sub
  @_sub
end

#_updatesObject

Returns the value of attribute _updates.



468
469
470
# File 'lib/nats/io/kv.rb', line 468

def _updates
  @_updates
end

#_watcher_condObject

Returns the value of attribute _watcher_cond.



468
469
470
# File 'lib/nats/io/kv.rb', line 468

def _watcher_cond
  @_watcher_cond
end

#pendingObject

Returns the value of attribute pending.



468
469
470
# File 'lib/nats/io/kv.rb', line 468

def pending
  @pending
end

#receivedObject

Returns the value of attribute received.



468
469
470
# File 'lib/nats/io/kv.rb', line 468

def received
  @received
end

Instance Method Details

#eachObject

Implements Enumerable.



504
505
506
507
508
509
# File 'lib/nats/io/kv.rb', line 504

def each
  loop do
    result = @_updates.pop
    yield result
  end
end

#stopObject



488
489
490
491
# File 'lib/nats/io/kv.rb', line 488

def stop
  @_hb_task.shutdown
  @_sub.unsubscribe
end

#take(n) ⇒ Object



511
512
513
# File 'lib/nats/io/kv.rb', line 511

def take(n)
  super.take(n).reject { |entry| entry.nil? }
end

#updates(params = {}) ⇒ Object



493
494
495
496
497
498
499
500
501
# File 'lib/nats/io/kv.rb', line 493

def updates(params = {})
  params[:timeout] ||= 5
  result = nil
  MonotonicTime.with_nats_timeout(params[:timeout]) do
    result = @_updates.pop(timeout: params[:timeout])
  end

  result
end