Class: Nsq::Consumer

Inherits:
ClientBase show all
Defined in:
lib/nsq/consumer.rb

Instance Attribute Summary collapse

Attributes inherited from ClientBase

#connections, #topic

Instance Method Summary collapse

Methods inherited from ClientBase

#connected?, #terminate

Methods included from AttributeLogger

included

Constructor Details

#initialize(opts = {}) ⇒ Consumer

Returns a new instance of Consumer.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/nsq/consumer.rb', line 8

def initialize(opts = {})
  if opts[:nsqlookupd]
    @nsqlookupds = [opts[:nsqlookupd]].flatten
  else
    @nsqlookupds = []
  end

  @topic = opts[:topic] || raise(ArgumentError, 'topic is required')
  @channel = opts[:channel] || raise(ArgumentError, 'channel is required')
  @max_in_flight = opts[:max_in_flight] || 1
  @discovery_interval = opts[:discovery_interval] || 60
  @msg_timeout = opts[:msg_timeout]
  @max_attempts = opts[:max_attempts]
  @ssl_context = opts[:ssl_context]
  @tls_options = opts[:tls_options]
  @tls_v1 = opts[:tls_v1]

  # This is where we queue up the messages we receive from each connection
  @messages = opts[:queue] || Queue.new

  # This is where we keep a record of our active nsqd connections
  # The key is a string with the host and port of the instance (e.g.
  # '127.0.0.1:4150') and the value is the Connection instance.
  @connections = {}

  if !@nsqlookupds.empty?
    discover_repeatedly(
      nsqlookupds: @nsqlookupds,
      topic: @topic,
      interval: @discovery_interval
    )

  elsif opts[:nsqd]
    nsqds = [opts[:nsqd]].flatten
    max_per_conn = max_in_flight_per_connection(nsqds.size)
    nsqds.each{|d| add_connection(d, max_in_flight: max_per_conn)}

  else
    add_connection('127.0.0.1:4150', max_in_flight: @max_in_flight)
  end
end

Instance Attribute Details

#max_in_flightObject (readonly)

Returns the value of attribute max_in_flight.



6
7
8
# File 'lib/nsq/consumer.rb', line 6

def max_in_flight
  @max_in_flight
end

Instance Method Details

#popObject

pop the next message off the queue



52
53
54
# File 'lib/nsq/consumer.rb', line 52

def pop
  @messages.pop
end

#pop_without_blockingObject

By default, if the internal queue is empty, pop will block until a new message comes in.

Calling this method won’t block. If there are no messages, it just returns nil.



62
63
64
65
66
67
# File 'lib/nsq/consumer.rb', line 62

def pop_without_blocking
  @messages.pop(true)
rescue ThreadError
  # When the Queue is empty calling `Queue#pop(true)` will raise a ThreadError
  nil
end

#sizeObject

returns the number of messages we have locally in the queue



71
72
73
# File 'lib/nsq/consumer.rb', line 71

def size
  @messages.size
end