Class: Nsq::Consumer
- Inherits:
-
ClientBase
- Object
- ClientBase
- Nsq::Consumer
- Defined in:
- lib/nsq/consumer.rb
Instance Attribute Summary collapse
-
#max_in_flight ⇒ Object
readonly
Returns the value of attribute max_in_flight.
Attributes inherited from ClientBase
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ Consumer
constructor
A new instance of Consumer.
-
#pop ⇒ Object
pop the next message off the queue.
-
#pop_without_blocking ⇒ Object
By default, if the internal queue is empty, pop will block until a new message comes in.
-
#size ⇒ Object
returns the number of messages we have locally in the queue.
Methods inherited from ClientBase
Methods included from AttributeLogger
Constructor Details
#initialize(opts = {}) ⇒ Consumer
Returns a new instance of Consumer.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/nsq/consumer.rb', line 8 def initialize(opts = {}) if opts[:nsqlookupd] @nsqlookupds = [opts[:nsqlookupd]].flatten else @nsqlookupds = [] end @topic = opts[:topic] || raise(ArgumentError, 'topic is required') @channel = opts[:channel] || raise(ArgumentError, 'channel is required') @max_in_flight = opts[:max_in_flight] || 1 @discovery_interval = opts[:discovery_interval] || 60 @msg_timeout = opts[:msg_timeout] @max_attempts = opts[:max_attempts] @ssl_context = opts[:ssl_context] @tls_options = opts[:tls_options] @tls_v1 = opts[:tls_v1] # This is where we queue up the messages we receive from each connection @messages = opts[:queue] || Queue.new # This is where we keep a record of our active nsqd connections # The key is a string with the host and port of the instance (e.g. # '127.0.0.1:4150') and the value is the Connection instance. @connections = {} if !@nsqlookupds.empty? discover_repeatedly( nsqlookupds: @nsqlookupds, topic: @topic, interval: @discovery_interval ) elsif opts[:nsqd] nsqds = [opts[:nsqd]].flatten max_per_conn = max_in_flight_per_connection(nsqds.size) nsqds.each{|d| add_connection(d, max_in_flight: max_per_conn)} else add_connection('127.0.0.1:4150', max_in_flight: @max_in_flight) end end |
Instance Attribute Details
#max_in_flight ⇒ Object (readonly)
Returns the value of attribute max_in_flight.
6 7 8 |
# File 'lib/nsq/consumer.rb', line 6 def max_in_flight @max_in_flight end |
Instance Method Details
#pop ⇒ Object
pop the next message off the queue
52 53 54 |
# File 'lib/nsq/consumer.rb', line 52 def pop @messages.pop end |
#pop_without_blocking ⇒ Object
By default, if the internal queue is empty, pop will block until a new message comes in.
Calling this method won’t block. If there are no messages, it just returns nil.
62 63 64 65 66 67 |
# File 'lib/nsq/consumer.rb', line 62 def pop_without_blocking @messages.pop(true) rescue ThreadError # When the Queue is empty calling `Queue#pop(true)` will raise a ThreadError nil end |
#size ⇒ Object
returns the number of messages we have locally in the queue
71 72 73 |
# File 'lib/nsq/consumer.rb', line 71 def size @messages.size end |