Class: NATS::Subscription

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/nats/io/subscription.rb

Overview

A Subscription represents interest in a given subject.

Examples:

Create NATS subscription with callback.

require 'nats/client'

nc = NATS.connect("demo.nats.io")
sub = nc.subscribe("foo") do |msg|
  puts "Received [#{msg.subject}]: #{}"
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**opts) ⇒ Subscription

Returns a new instance of Subscription.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/nats/io/subscription.rb', line 39

def initialize(**opts)
  super() # required to initialize monitor
  @subject = ""
  @queue = nil
  @future = nil
  @callback = nil
  @response = nil
  @received = 0
  @max = nil
  @pending = nil
  @sid = nil
  @nc = nil
  @closed = nil
  @drained = false

  # State from async subscriber messages delivery
  @pending_queue = nil
  @pending_size = 0
  @pending_msgs_limit = nil
  @pending_bytes_limit = nil

  # Sync subscriber
  @wait_for_msgs_cond = nil

  # To limit number of concurrent messages being processed (1 to only allow sequential processing)
  @processing_concurrency = opts.fetch(:processing_concurrency, NATS::IO::DEFAULT_SINGLE_SUB_CONCURRENCY)
end

Instance Attribute Details

#callbackObject

Returns the value of attribute callback.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def callback
  @callback
end

#closedObject

Returns the value of attribute closed.



36
37
38
# File 'lib/nats/io/subscription.rb', line 36

def closed
  @closed
end

#drainedObject

Returns the value of attribute drained.



36
37
38
# File 'lib/nats/io/subscription.rb', line 36

def drained
  @drained
end

#futureObject

Returns the value of attribute future.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def future
  @future
end

#jsiObject

Returns the value of attribute jsi.



35
36
37
# File 'lib/nats/io/subscription.rb', line 35

def jsi
  @jsi
end

#maxObject

Returns the value of attribute max.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def max
  @max
end

#ncObject

Returns the value of attribute nc.



34
35
36
# File 'lib/nats/io/subscription.rb', line 34

def nc
  @nc
end

#pendingObject

Returns the value of attribute pending.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def pending
  @pending
end

#pending_bytes_limitObject

Returns the value of attribute pending_bytes_limit.



33
34
35
# File 'lib/nats/io/subscription.rb', line 33

def pending_bytes_limit
  @pending_bytes_limit
end

#pending_msgs_limitObject

Returns the value of attribute pending_msgs_limit.



33
34
35
# File 'lib/nats/io/subscription.rb', line 33

def pending_msgs_limit
  @pending_msgs_limit
end

#pending_queueObject

Returns the value of attribute pending_queue.



32
33
34
# File 'lib/nats/io/subscription.rb', line 32

def pending_queue
  @pending_queue
end

#pending_sizeObject

Returns the value of attribute pending_size.



32
33
34
# File 'lib/nats/io/subscription.rb', line 32

def pending_size
  @pending_size
end

#queueObject

Returns the value of attribute queue.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def queue
  @queue
end

#receivedObject Also known as: delivered

Returns the value of attribute received.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def received
  @received
end

#responseObject

Returns the value of attribute response.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def response
  @response
end

#sidObject

Returns the value of attribute sid.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def sid
  @sid
end

#subjectObject

Returns the value of attribute subject.



31
32
33
# File 'lib/nats/io/subscription.rb', line 31

def subject
  @subject
end

#wait_for_msgs_condObject

Returns the value of attribute wait_for_msgs_cond.



32
33
34
# File 'lib/nats/io/subscription.rb', line 32

def wait_for_msgs_cond
  @wait_for_msgs_cond
end

Instance Method Details

#concurrency_semaphoreObject



78
79
80
# File 'lib/nats/io/subscription.rb', line 78

def concurrency_semaphore
  @concurrency_semaphore ||= Concurrent::Semaphore.new(@processing_concurrency)
end

#dispatch(msg) ⇒ Object



111
112
113
114
115
116
117
118
119
120
# File 'lib/nats/io/subscription.rb', line 111

def dispatch(msg)
  pending_queue << msg
  synchronize { self.pending_size += msg.data.size }

  # For async subscribers, send message for processing to the thread pool.
  enqueue_processing(@nc.subscription_executor) if callback

  # For sync subscribers, signal that there is a new message.
  wait_for_msgs_cond&.signal
end

#enqueue_processing(executor) ⇒ Object

Send a message for its processing to a separate thread



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/nats/io/subscription.rb', line 145

def enqueue_processing(executor)
  concurrency_semaphore.try_acquire || return # Previous message is being executed, let it finish and enqueue next one.
  executor.post do
    msg = pending_queue.pop(true)
    process(msg)
  rescue ThreadError # queue is empty
    concurrency_semaphore.release
  ensure
    concurrency_semaphore.release
    [concurrency_semaphore.available_permits, pending_queue.size].min.times do
      enqueue_processing(executor)
    end
  end
end

#inspectObject



107
108
109
# File 'lib/nats/io/subscription.rb', line 107

def inspect
  "#<NATS::Subscription(subject: \"#{@subject}\", queue: \"#{@queue}\", sid: #{@sid})>"
end

#next_msg(opts = {}) ⇒ Object

next_msg blocks and waiting for the next message to be received.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/nats/io/subscription.rb', line 89

def next_msg(opts = {})
  timeout = opts[:timeout] ||= 0.5
  synchronize do
    return @pending_queue.pop if !@pending_queue.empty?

    # Wait for a bit until getting a signal.
    MonotonicTime.with_nats_timeout(timeout) do
      wait_for_msgs_cond.wait(timeout)
    end

    if !@pending_queue.empty?
      return @pending_queue.pop
    else
      raise NATS::Timeout
    end
  end
end

#process(msg) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/nats/io/subscription.rb', line 122

def process(msg)
  return unless callback

  # Decrease pending size since consumed already
  synchronize { self.pending_size -= msg.data.size }

  nc.reloader.call do
    # Note: Keep some of the alternative arity versions to slightly
    # improve backwards compatibility.  Eventually fine to deprecate
    # since recommended version would be arity of 1 to get a NATS::Msg.
    case callback.arity
    when 0 then callback.call
    when 1 then callback.call(msg)
    when 2 then callback.call(msg.data, msg.reply)
    when 3 then callback.call(msg.data, msg.reply, msg.subject)
    else callback.call(msg.data, msg.reply, msg.subject, msg.header)
    end
  rescue => e
    synchronize { nc.send(:err_cb_call, nc, e, self) }
  end
end

#processing_concurrency=(value) ⇒ Object

Concurrency of message processing for a single subscription. 1 means sequential processing 2+ allow processed concurrently and possibly out of order.

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
# File 'lib/nats/io/subscription.rb', line 70

def processing_concurrency=(value)
  raise ArgumentError, "nats: subscription processing concurrency must be positive integer" unless value.positive?
  return if @processing_concurrency == value

  @processing_concurrency = value
  @concurrency_semaphore = Concurrent::Semaphore.new(value)
end

#unsubscribe(opt_max = nil) ⇒ Object

Auto unsubscribes the server by sending UNSUB command and throws away subscription in case already present and has received enough messages.



84
85
86
# File 'lib/nats/io/subscription.rb', line 84

def unsubscribe(opt_max = nil)
  @nc.send(:unsubscribe, self, opt_max)
end