Class: NATS::Subscription
- Inherits:
-
Object
- Object
- NATS::Subscription
- Includes:
- MonitorMixin
- Defined in:
- lib/nats/io/subscription.rb
Overview
A Subscription represents interest in a given subject.
Instance Attribute Summary collapse
-
#callback ⇒ Object
Returns the value of attribute callback.
-
#closed ⇒ Object
Returns the value of attribute closed.
-
#drained ⇒ Object
Returns the value of attribute drained.
-
#future ⇒ Object
Returns the value of attribute future.
-
#jsi ⇒ Object
Returns the value of attribute jsi.
-
#max ⇒ Object
Returns the value of attribute max.
-
#nc ⇒ Object
Returns the value of attribute nc.
-
#pending ⇒ Object
Returns the value of attribute pending.
-
#pending_bytes_limit ⇒ Object
Returns the value of attribute pending_bytes_limit.
-
#pending_msgs_limit ⇒ Object
Returns the value of attribute pending_msgs_limit.
-
#pending_queue ⇒ Object
Returns the value of attribute pending_queue.
-
#pending_size ⇒ Object
Returns the value of attribute pending_size.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#received ⇒ Object
(also: #delivered)
Returns the value of attribute received.
-
#response ⇒ Object
Returns the value of attribute response.
-
#sid ⇒ Object
Returns the value of attribute sid.
-
#subject ⇒ Object
Returns the value of attribute subject.
-
#wait_for_msgs_cond ⇒ Object
Returns the value of attribute wait_for_msgs_cond.
Instance Method Summary collapse
- #concurrency_semaphore ⇒ Object
- #dispatch(msg) ⇒ Object
-
#enqueue_processing(executor) ⇒ Object
Send a message for its processing to a separate thread.
-
#initialize(**opts) ⇒ Subscription
constructor
A new instance of Subscription.
- #inspect ⇒ Object
-
#next_msg(opts = {}) ⇒ Object
next_msg blocks and waiting for the next message to be received.
- #process(msg) ⇒ Object
-
#processing_concurrency=(value) ⇒ Object
Concurrency of message processing for a single subscription.
-
#unsubscribe(opt_max = nil) ⇒ Object
Auto unsubscribes the server by sending UNSUB command and throws away subscription in case already present and has received enough messages.
Constructor Details
#initialize(**opts) ⇒ Subscription
Returns a new instance of Subscription.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/nats/io/subscription.rb', line 39 def initialize(**opts) super() # required to initialize monitor @subject = "" @queue = nil @future = nil @callback = nil @response = nil @received = 0 @max = nil @pending = nil @sid = nil @nc = nil @closed = nil @drained = false # State from async subscriber messages delivery @pending_queue = nil @pending_size = 0 @pending_msgs_limit = nil @pending_bytes_limit = nil # Sync subscriber @wait_for_msgs_cond = nil # To limit number of concurrent messages being processed (1 to only allow sequential processing) @processing_concurrency = opts.fetch(:processing_concurrency, NATS::IO::DEFAULT_SINGLE_SUB_CONCURRENCY) end |
Instance Attribute Details
#callback ⇒ Object
Returns the value of attribute callback.
31 32 33 |
# File 'lib/nats/io/subscription.rb', line 31 def callback @callback end |
#closed ⇒ Object
Returns the value of attribute closed.
36 37 38 |
# File 'lib/nats/io/subscription.rb', line 36 def closed @closed end |
#drained ⇒ Object
Returns the value of attribute drained.
36 37 38 |
# File 'lib/nats/io/subscription.rb', line 36 def drained @drained end |
#future ⇒ Object
Returns the value of attribute future.
31 32 33 |
# File 'lib/nats/io/subscription.rb', line 31 def future @future end |
#jsi ⇒ Object
Returns the value of attribute jsi.
35 36 37 |
# File 'lib/nats/io/subscription.rb', line 35 def jsi @jsi end |
#max ⇒ Object
Returns the value of attribute max.
31 32 33 |
# File 'lib/nats/io/subscription.rb', line 31 def max @max end |
#nc ⇒ Object
Returns the value of attribute nc.
34 35 36 |
# File 'lib/nats/io/subscription.rb', line 34 def nc @nc end |
#pending ⇒ Object
Returns the value of attribute pending.
31 32 33 |
# File 'lib/nats/io/subscription.rb', line 31 def pending @pending end |
#pending_bytes_limit ⇒ Object
Returns the value of attribute pending_bytes_limit.
33 34 35 |
# File 'lib/nats/io/subscription.rb', line 33 def pending_bytes_limit @pending_bytes_limit end |
#pending_msgs_limit ⇒ Object
Returns the value of attribute pending_msgs_limit.
33 34 35 |
# File 'lib/nats/io/subscription.rb', line 33 def pending_msgs_limit @pending_msgs_limit end |
#pending_queue ⇒ Object
Returns the value of attribute pending_queue.
32 33 34 |
# File 'lib/nats/io/subscription.rb', line 32 def pending_queue @pending_queue end |
#pending_size ⇒ Object
Returns the value of attribute pending_size.
32 33 34 |
# File 'lib/nats/io/subscription.rb', line 32 def pending_size @pending_size end |
#queue ⇒ Object
Returns the value of attribute queue.
31 32 33 |
# File 'lib/nats/io/subscription.rb', line 31 def queue @queue end |
#received ⇒ Object Also known as: delivered
Returns the value of attribute received.
31 32 33 |
# File 'lib/nats/io/subscription.rb', line 31 def received @received end |
#response ⇒ Object
Returns the value of attribute response.
31 32 33 |
# File 'lib/nats/io/subscription.rb', line 31 def response @response end |
#sid ⇒ Object
Returns the value of attribute sid.
31 32 33 |
# File 'lib/nats/io/subscription.rb', line 31 def sid @sid end |
#subject ⇒ Object
Returns the value of attribute subject.
31 32 33 |
# File 'lib/nats/io/subscription.rb', line 31 def subject @subject end |
#wait_for_msgs_cond ⇒ Object
Returns the value of attribute wait_for_msgs_cond.
32 33 34 |
# File 'lib/nats/io/subscription.rb', line 32 def wait_for_msgs_cond @wait_for_msgs_cond end |
Instance Method Details
#concurrency_semaphore ⇒ Object
78 79 80 |
# File 'lib/nats/io/subscription.rb', line 78 def concurrency_semaphore @concurrency_semaphore ||= Concurrent::Semaphore.new(@processing_concurrency) end |
#dispatch(msg) ⇒ Object
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/nats/io/subscription.rb', line 111 def dispatch(msg) pending_queue << msg synchronize { self.pending_size += msg.data.size } # For async subscribers, send message for processing to the thread pool. enqueue_processing(@nc.subscription_executor) if callback # For sync subscribers, signal that there is a new message. wait_for_msgs_cond&.signal end |
#enqueue_processing(executor) ⇒ Object
Send a message for its processing to a separate thread
145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/nats/io/subscription.rb', line 145 def enqueue_processing(executor) concurrency_semaphore.try_acquire || return # Previous message is being executed, let it finish and enqueue next one. executor.post do msg = pending_queue.pop(true) process(msg) rescue ThreadError # queue is empty concurrency_semaphore.release ensure concurrency_semaphore.release [concurrency_semaphore.available_permits, pending_queue.size].min.times do enqueue_processing(executor) end end end |
#inspect ⇒ Object
107 108 109 |
# File 'lib/nats/io/subscription.rb', line 107 def inspect "#<NATS::Subscription(subject: \"#{@subject}\", queue: \"#{@queue}\", sid: #{@sid})>" end |
#next_msg(opts = {}) ⇒ Object
next_msg blocks and waiting for the next message to be received.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/nats/io/subscription.rb', line 89 def next_msg(opts = {}) timeout = opts[:timeout] ||= 0.5 synchronize do return @pending_queue.pop if !@pending_queue.empty? # Wait for a bit until getting a signal. MonotonicTime.with_nats_timeout(timeout) do wait_for_msgs_cond.wait(timeout) end if !@pending_queue.empty? return @pending_queue.pop else raise NATS::Timeout end end end |
#process(msg) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/nats/io/subscription.rb', line 122 def process(msg) return unless callback # Decrease pending size since consumed already synchronize { self.pending_size -= msg.data.size } nc.reloader.call do # Note: Keep some of the alternative arity versions to slightly # improve backwards compatibility. Eventually fine to deprecate # since recommended version would be arity of 1 to get a NATS::Msg. case callback.arity when 0 then callback.call when 1 then callback.call(msg) when 2 then callback.call(msg.data, msg.reply) when 3 then callback.call(msg.data, msg.reply, msg.subject) else callback.call(msg.data, msg.reply, msg.subject, msg.header) end rescue => e synchronize { nc.send(:err_cb_call, nc, e, self) } end end |
#processing_concurrency=(value) ⇒ Object
Concurrency of message processing for a single subscription. 1 means sequential processing 2+ allow processed concurrently and possibly out of order.
70 71 72 73 74 75 76 |
# File 'lib/nats/io/subscription.rb', line 70 def processing_concurrency=(value) raise ArgumentError, "nats: subscription processing concurrency must be positive integer" unless value.positive? return if @processing_concurrency == value @processing_concurrency = value @concurrency_semaphore = Concurrent::Semaphore.new(value) end |
#unsubscribe(opt_max = nil) ⇒ Object
Auto unsubscribes the server by sending UNSUB command and throws away subscription in case already present and has received enough messages.
84 85 86 |
# File 'lib/nats/io/subscription.rb', line 84 def unsubscribe(opt_max = nil) @nc.send(:unsubscribe, self, opt_max) end |