Class: FastlyNsq::Consumer
- Inherits:
-
Object
- Object
- FastlyNsq::Consumer
- Extended by:
- Forwardable
- Includes:
- ConsumerTesting
- Defined in:
- lib/fastly_nsq/consumer.rb
Overview
Provides an adapter to an Nsq::Consumer and used to read messages off the queue.
Constant Summary collapse
- DEFAULT_CONNECTION_TIMEOUT =
Default NSQ connection timeout in seconds
5
Instance Attribute Summary collapse
-
#channel ⇒ String
readonly
NSQ Channel.
-
#connect_timeout ⇒ Integer
readonly
Connection timeout in seconds.
- #connection ⇒ Nsq::Consumer readonly
-
#max_attempts ⇒ Integer
readonly
Maximum number of times an NSQ message will be attempted.
-
#topic ⇒ String
readonly
NSQ Topic.
Instance Method Summary collapse
-
#connected? ⇒ Nsq::Consumer#connected?
Delegated to
self.connection
. -
#empty? ⇒ Boolean
Is the message queue empty?.
-
#initialize(topic:, channel:, queue: nil, tls_options: nil, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **options) ⇒ Consumer
constructor
Create a FastlyNsq::Consumer.
-
#pop ⇒ Nsq::Consumer#pop
Delegated to
self.connection
. -
#pop_without_blocking ⇒ Nsq::Consumer#pop_without_blocking
Delegated to
self.connection
. -
#size ⇒ Nsq::Consumer#size
Delegated to
self.connection
. -
#terminate ⇒ Nsq::Consumer#terminate
Delegated to
self.connection
.
Methods included from ConsumerTesting
#messages, prepended, #terminated?
Constructor Details
#initialize(topic:, channel:, queue: nil, tls_options: nil, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **options) ⇒ Consumer
Create a FastlyNsq::Consumer
Will connect to NSQDs in this priority: 1. direct from FastlyNsq.consumer_nsqds 2. discovered via FastlyNsq.lookupd_http_addresses. If both ‘consumer_nsqds` and `lookupd_http_addresses` are set only the FastlyNsq.consumer_nsqds will be used.
83 84 85 86 87 88 89 90 91 |
# File 'lib/fastly_nsq/consumer.rb', line 83 def initialize(topic:, channel:, queue: nil, tls_options: nil, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **) @topic = topic @channel = channel = FastlyNsq::TlsOptions.as_hash() @connect_timeout = connect_timeout @max_attempts = max_attempts @connection = connect(queue, **) end |
Instance Attribute Details
#channel ⇒ String (readonly)
Returns NSQ Channel.
25 26 27 |
# File 'lib/fastly_nsq/consumer.rb', line 25 def channel @channel end |
#connect_timeout ⇒ Integer (readonly)
Returns connection timeout in seconds.
34 35 36 |
# File 'lib/fastly_nsq/consumer.rb', line 34 def connect_timeout @connect_timeout end |
#connection ⇒ Nsq::Consumer (readonly)
31 32 33 |
# File 'lib/fastly_nsq/consumer.rb', line 31 def connection @connection end |
#max_attempts ⇒ Integer (readonly)
Returns maximum number of times an NSQ message will be attempted.
37 38 39 |
# File 'lib/fastly_nsq/consumer.rb', line 37 def max_attempts @max_attempts end |
#topic ⇒ String (readonly)
Returns NSQ Topic.
28 29 30 |
# File 'lib/fastly_nsq/consumer.rb', line 28 def topic @topic end |
Instance Method Details
#connected? ⇒ Nsq::Consumer#connected?
Delegated to self.connection
59 |
# File 'lib/fastly_nsq/consumer.rb', line 59 def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate |
#empty? ⇒ Boolean
Is the message queue empty?
96 97 98 |
# File 'lib/fastly_nsq/consumer.rb', line 96 def empty? size.zero? end |
#pop ⇒ Nsq::Consumer#pop
Delegated to self.connection
59 |
# File 'lib/fastly_nsq/consumer.rb', line 59 def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate |
#pop_without_blocking ⇒ Nsq::Consumer#pop_without_blocking
Delegated to self.connection
59 |
# File 'lib/fastly_nsq/consumer.rb', line 59 def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate |
#size ⇒ Nsq::Consumer#size
Delegated to self.connection
59 |
# File 'lib/fastly_nsq/consumer.rb', line 59 def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate |
#terminate ⇒ Nsq::Consumer#terminate
Delegated to self.connection
59 |
# File 'lib/fastly_nsq/consumer.rb', line 59 def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate |