Class: Nsq::Consumer
- Inherits:
-
ClientBase
- Object
- ClientBase
- Nsq::Consumer
- Defined in:
- lib/nsq/consumer.rb
Instance Attribute Summary collapse
-
#max_in_flight ⇒ Object
readonly
Returns the value of attribute max_in_flight.
Attributes inherited from ClientBase
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ Consumer
constructor
A new instance of Consumer.
-
#pop ⇒ Object
pop the next message off the queue.
-
#size ⇒ Object
returns the number of messages we have locally in the queue.
Methods inherited from ClientBase
Methods included from AttributeLogger
Constructor Details
#initialize(opts = {}) ⇒ Consumer
Returns a new instance of Consumer.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/nsq/consumer.rb', line 8 def initialize(opts = {}) if opts[:nsqlookupd] @nsqlookupds = [opts[:nsqlookupd]].flatten else @nsqlookupds = [] end @topic = opts[:topic] || raise(ArgumentError, 'topic is required') @channel = opts[:channel] || raise(ArgumentError, 'channel is required') @max_in_flight = opts[:max_in_flight] || 1 @discovery_interval = opts[:discovery_interval] || 60 @msg_timeout = opts[:msg_timeout] # This is where we queue up the messages we receive from each connection = opts[:queue] || Queue.new # This is where we keep a record of our active nsqd connections # The key is a string with the host and port of the instance (e.g. # '127.0.0.1:4150') and the key is the Connection instance. @connections = {} if !@nsqlookupds.empty? discover_repeatedly( nsqlookupds: @nsqlookupds, topic: @topic, interval: @discovery_interval ) else # normally, we find nsqd instances to connect to via nsqlookupd(s) # in this case let's connect to an nsqd instance directly add_connection(opts[:nsqd] || '127.0.0.1:4150', max_in_flight: @max_in_flight) end at_exit{terminate} end |
Instance Attribute Details
#max_in_flight ⇒ Object (readonly)
Returns the value of attribute max_in_flight.
6 7 8 |
# File 'lib/nsq/consumer.rb', line 6 def max_in_flight @max_in_flight end |
Instance Method Details
#pop ⇒ Object
pop the next message off the queue
46 47 48 |
# File 'lib/nsq/consumer.rb', line 46 def pop .pop end |
#size ⇒ Object
returns the number of messages we have locally in the queue
52 53 54 |
# File 'lib/nsq/consumer.rb', line 52 def size .size end |