Class: NATS::Subscription

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/nats/io/subscription.rb

Overview

A Subscription represents interest in a given subject.

Examples:

Create NATS subscription with callback.

require 'nats/client'

nc = NATS.connect("demo.nats.io")
sub = nc.subscribe("foo") do |msg|
  puts "Received [#{msg.subject}]: #{}"
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**opts) ⇒ Subscription

Returns a new instance of Subscription.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/nats/io/subscription.rb', line 37

def initialize(**opts)
  super() # required to initialize monitor
  @subject  = ''
  @queue    = nil
  @future   = nil
  @callback = nil
  @response = nil
  @received = 0
  @max      = nil
  @pending  = nil
  @sid      = nil
  @nc       = nil
  @closed   = nil

  # State from async subscriber messages delivery
  @pending_queue       = nil
  @pending_size        = 0
  @pending_msgs_limit  = nil
  @pending_bytes_limit = nil

  # Sync subscriber
  @wait_for_msgs_cond = nil

  # To limit number of concurrent messages being processed (1 to only allow sequential processing)
  @processing_concurrency = opts.fetch(:processing_concurrency, NATS::IO::DEFAULT_SINGLE_SUB_CONCURRENCY)
end

Instance Attribute Details

#callbackObject

Returns the value of attribute callback.



30
31
32
# File 'lib/nats/io/subscription.rb', line 30

def callback
  @callback
end

#closedObject

Returns the value of attribute closed.



35
36
37
# File 'lib/nats/io/subscription.rb', line 35

def closed
  @closed
end

#concurrency_semaphoreObject

Returns the value of attribute concurrency_semaphore.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def concurrency_semaphore
  @concurrency_semaphore
end

#futureObject

Returns the value of attribute future.



30
31
32
# File 'lib/nats/io/subscription.rb', line 30

def future
  @future
end

#jsiObject

Returns the value of attribute jsi.



34
35
36
# File 'lib/nats/io/subscription.rb', line 34

def jsi
  @jsi
end

#maxObject

Returns the value of attribute max.



30
31
32
# File 'lib/nats/io/subscription.rb', line 30

def max
  @max
end

#ncObject

Returns the value of attribute nc.



33
34
35
# File 'lib/nats/io/subscription.rb', line 33

def nc
  @nc
end

#pendingObject

Returns the value of attribute pending.



30
31
32
# File 'lib/nats/io/subscription.rb', line 30

def pending
  @pending
end

#pending_bytes_limitObject

Returns the value of attribute pending_bytes_limit.



32
33
34
# File 'lib/nats/io/subscription.rb', line 32

def pending_bytes_limit
  @pending_bytes_limit
end

#pending_msgs_limitObject

Returns the value of attribute pending_msgs_limit.



32
33
34
# File 'lib/nats/io/subscription.rb', line 32

def pending_msgs_limit
  @pending_msgs_limit
end

#pending_queueObject

Returns the value of attribute pending_queue.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def pending_queue
  @pending_queue
end

#pending_sizeObject

Returns the value of attribute pending_size.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def pending_size
  @pending_size
end

#queueObject

Returns the value of attribute queue.



30
31
32
# File 'lib/nats/io/subscription.rb', line 30

def queue
  @queue
end

#receivedObject

Returns the value of attribute received.



30
31
32
# File 'lib/nats/io/subscription.rb', line 30

def received
  @received
end

#responseObject

Returns the value of attribute response.



30
31
32
# File 'lib/nats/io/subscription.rb', line 30

def response
  @response
end

#sidObject

Returns the value of attribute sid.



30
31
32
# File 'lib/nats/io/subscription.rb', line 30

def sid
  @sid
end

#subjectObject

Returns the value of attribute subject.



30
31
32
# File 'lib/nats/io/subscription.rb', line 30

def subject
  @subject
end

#wait_for_msgs_condObject

Returns the value of attribute wait_for_msgs_cond.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def wait_for_msgs_cond
  @wait_for_msgs_cond
end

Instance Method Details

#dispatch(msg) ⇒ Object



108
109
110
111
112
113
114
115
116
117
# File 'lib/nats/io/subscription.rb', line 108

def dispatch(msg)
  pending_queue << msg
  synchronize { self.pending_size += msg.data.size }

  # For async subscribers, send message for processing to the thread pool.
  enqueue_processing(@nc.subscription_executor) if callback

  # For sync subscribers, signal that there is a new message.
  wait_for_msgs_cond&.signal
end

#enqueue_processing(executor) ⇒ Object

Send a message for its processing to a separate thread



142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/nats/io/subscription.rb', line 142

def enqueue_processing(executor)
  concurrency_semaphore.try_acquire || return # Previous message is being executed, let it finish and enqueue next one.
  executor.post do
    msg = pending_queue.pop(true)
    process(msg)
  rescue ThreadError # queue is empty
    concurrency_semaphore.release
  ensure
    concurrency_semaphore.release
    [concurrency_semaphore.available_permits, pending_queue.size].min.times do
      enqueue_processing(executor)
    end
  end
end

#inspectObject



104
105
106
# File 'lib/nats/io/subscription.rb', line 104

def inspect
  "#<NATS::Subscription(subject: \"#{@subject}\", queue: \"#{@queue}\", sid: #{@sid})>"
end

#next_msg(opts = {}) ⇒ Object

next_msg blocks and waiting for the next message to be received.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/nats/io/subscription.rb', line 86

def next_msg(opts={})
  timeout = opts[:timeout] ||= 0.5
  synchronize do
    return @pending_queue.pop if not @pending_queue.empty?

    # Wait for a bit until getting a signal.
    MonotonicTime::with_nats_timeout(timeout) do
      wait_for_msgs_cond.wait(timeout)
    end

    if not @pending_queue.empty?
      return @pending_queue.pop
    else
      raise NATS::Timeout
    end
  end
end

#process(msg) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/nats/io/subscription.rb', line 119

def process(msg)
  return unless callback

  # Decrease pending size since consumed already
  synchronize { self.pending_size -= msg.data.size }

  nc.reloader.call do
    # Note: Keep some of the alternative arity versions to slightly
    # improve backwards compatibility.  Eventually fine to deprecate
    # since recommended version would be arity of 1 to get a NATS::Msg.
    case callback.arity
    when 0 then callback.call
    when 1 then callback.call(msg)
    when 2 then callback.call(msg.data, msg.reply)
    when 3 then callback.call(msg.data, msg.reply, msg.subject)
    else callback.call(msg.data, msg.reply, msg.subject, msg.header)
    end
  rescue => e
    synchronize { nc.send(:err_cb_call, nc, e, self) }
  end
end

#processing_concurrency=(value) ⇒ Object

Concurrency of message processing for a single subscription. 1 means sequential processing 2+ allow processed concurrently and possibly out of order.

Raises:

  • (ArgumentError)


67
68
69
70
71
72
73
# File 'lib/nats/io/subscription.rb', line 67

def processing_concurrency=(value)
  raise ArgumentError, "nats: subscription processing concurrency must be positive integer" unless value.positive?
  return if @processing_concurrency == value

  @processing_concurrency = value
  @concurrency_semaphore = Concurrent::Semaphore.new(value)
end

#unsubscribe(opt_max = nil) ⇒ Object

Auto unsubscribes the server by sending UNSUB command and throws away subscription in case already present and has received enough messages.



81
82
83
# File 'lib/nats/io/subscription.rb', line 81

def unsubscribe(opt_max=nil)
  @nc.send(:unsubscribe, self, opt_max)
end