Class: NATS::Subscription

Inherits:
Object show all
Includes:
MonitorMixin
Defined in:
lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb

Overview

A Subscription represents interest in a given subject.

Examples:

Create NATS subscription with callback.

require 'nats/client'

nc = NATS.connect("demo.nats.io")
sub = nc.subscribe("foo") do |msg|
  puts "Received [#{msg.subject}]: #{}"
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSubscription

Returns a new instance of Subscription.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 37

def initialize
  super # required to initialize monitor
  @subject  = ''
  @queue    = nil
  @future   = nil
  @callback = nil
  @response = nil
  @received = 0
  @max      = nil
  @pending  = nil
  @sid      = nil
  @nc       = nil
  @closed   = nil

  # State from async subscriber messages delivery
  @pending_queue       = nil
  @pending_size        = 0
  @pending_msgs_limit  = nil
  @pending_bytes_limit = nil
  @wait_for_msgs_t     = nil
  @is_slow_consumer    = false

  # Sync subscriber
  @wait_for_msgs_cond = nil
end

Instance Attribute Details

#callbackObject

Returns the value of attribute callback.



30
31
32
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 30

def callback
  @callback
end

#closedObject

Returns the value of attribute closed.



35
36
37
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 35

def closed
  @closed
end

#futureObject

Returns the value of attribute future.



30
31
32
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 30

def future
  @future
end

#is_slow_consumerObject

Returns the value of attribute is_slow_consumer.



31
32
33
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 31

def is_slow_consumer
  @is_slow_consumer
end

#jsiObject

Returns the value of attribute jsi.



34
35
36
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 34

def jsi
  @jsi
end

#maxObject

Returns the value of attribute max.



30
31
32
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 30

def max
  @max
end

#ncObject

Returns the value of attribute nc.



33
34
35
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 33

def nc
  @nc
end

#pendingObject

Returns the value of attribute pending.



30
31
32
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 30

def pending
  @pending
end

#pending_bytes_limitObject

Returns the value of attribute pending_bytes_limit.



32
33
34
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 32

def pending_bytes_limit
  @pending_bytes_limit
end

#pending_msgs_limitObject

Returns the value of attribute pending_msgs_limit.



32
33
34
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 32

def pending_msgs_limit
  @pending_msgs_limit
end

#pending_queueObject

Returns the value of attribute pending_queue.



31
32
33
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 31

def pending_queue
  @pending_queue
end

#pending_sizeObject

Returns the value of attribute pending_size.



31
32
33
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 31

def pending_size
  @pending_size
end

#queueObject

Returns the value of attribute queue.



30
31
32
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 30

def queue
  @queue
end

#receivedObject

Returns the value of attribute received.



30
31
32
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 30

def received
  @received
end

#responseObject

Returns the value of attribute response.



30
31
32
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 30

def response
  @response
end

#sidObject

Returns the value of attribute sid.



30
31
32
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 30

def sid
  @sid
end

#subjectObject

Returns the value of attribute subject.



30
31
32
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 30

def subject
  @subject
end

#wait_for_msgs_condObject

Returns the value of attribute wait_for_msgs_cond.



31
32
33
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 31

def wait_for_msgs_cond
  @wait_for_msgs_cond
end

#wait_for_msgs_tObject

Returns the value of attribute wait_for_msgs_t.



31
32
33
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 31

def wait_for_msgs_t
  @wait_for_msgs_t
end

Instance Method Details

#inspectObject



88
89
90
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 88

def inspect
  "#<NATS::Subscription(subject: \"#{@subject}\", queue: \"#{@queue}\", sid: #{@sid})>"
end

#next_msg(opts = {}) ⇒ Object

next_msg blocks and waiting for the next message to be received.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 70

def next_msg(opts={})
  timeout = opts[:timeout] ||= 0.5
  synchronize do
    return @pending_queue.pop if not @pending_queue.empty?

    # Wait for a bit until getting a signal.
    MonotonicTime::with_nats_timeout(timeout) do
      wait_for_msgs_cond.wait(timeout)
    end

    if not @pending_queue.empty?
      return @pending_queue.pop
    else
      raise NATS::Timeout
    end
  end
end

#unsubscribe(opt_max = nil) ⇒ Object

Auto unsubscribes the server by sending UNSUB command and throws away subscription in case already present and has received enough messages.



65
66
67
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/nats-pure-2.2.1/lib/nats/io/subscription.rb', line 65

def unsubscribe(opt_max=nil)
  @nc.send(:unsubscribe, self, opt_max)
end