Class: FastlyNsq::Producer
- Inherits:
-
Object
- Object
- FastlyNsq::Producer
- Includes:
- ProducerTesting
- Defined in:
- lib/fastly_nsq/producer.rb
Overview
Provides an adapter to an Nsq::Producer and used to write messages to the queue.
Constant Summary collapse
- DEFAULT_CONNECTION_TIMEOUT =
seconds
5
Instance Attribute Summary collapse
-
#connect_timeout ⇒ Integer
readonly
Connection timeout in seconds.
- #connection ⇒ Nsq::Producer readonly
- #logger ⇒ Logger readonly
-
#topic ⇒ String
readonly
NSQ Topic.
Instance Method Summary collapse
-
#connect ⇒ Boolean
Create an Nsq::Producer and set as @connection instance variable.
-
#connected? ⇒ Nsq::Consumer#connected?
Check conenction status.
-
#initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT) ⇒ Producer
constructor
Create a FastlyNsq::Producer.
-
#terminate ⇒ Nsq::Producer#terminate
Terminate the NSQ connection and set connection instance to
nil. -
#write(message) ⇒ Nsq::Producer#pop
Write a message.
Methods included from ProducerTesting
Constructor Details
#initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT) ⇒ Producer
Create a FastlyNsq::Producer
33 34 35 36 37 38 39 40 |
# File 'lib/fastly_nsq/producer.rb', line 33 def initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT) @topic = topic = FastlyNsq::TlsOptions.as_hash() @connect_timeout = connect_timeout @logger = logger connect end |
Instance Attribute Details
#connect_timeout ⇒ Integer (readonly)
19 20 21 |
# File 'lib/fastly_nsq/producer.rb', line 19 def connect_timeout @connect_timeout end |
#connection ⇒ Nsq::Producer (readonly)
16 17 18 |
# File 'lib/fastly_nsq/producer.rb', line 16 def connection @connection end |
#logger ⇒ Logger (readonly)
22 23 24 |
# File 'lib/fastly_nsq/producer.rb', line 22 def logger @logger end |
#topic ⇒ String (readonly)
13 14 15 |
# File 'lib/fastly_nsq/producer.rb', line 13 def topic @topic end |
Instance Method Details
#connect ⇒ Boolean
Create an Nsq::Producer and set as @connection instance variable
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fastly_nsq/producer.rb', line 73 def connect lookupd = FastlyNsq.lookupd_http_addresses @connection ||= Nsq::Producer.new( .merge( nsqlookupd: lookupd, topic: topic, ), ) timeout_args = [connect_timeout, FastlyNsq::ConnectionFailed] if RUBY_VERSION > '2.4.0' timeout_args << "Failed connection to #{lookupd} within #{connect_timeout} seconds" end Timeout.timeout(*timeout_args) { Thread.pass until connection.connected? } true rescue FastlyNsq::ConnectionFailed logger.error { "Producer for #{topic} failed to connect!" } terminate raise end |
#connected? ⇒ Nsq::Consumer#connected?
Check conenction status
55 56 57 58 59 |
# File 'lib/fastly_nsq/producer.rb', line 55 def connected? return false unless connection connection.connected? end |
#terminate ⇒ Nsq::Producer#terminate
Terminate the NSQ connection and set connection instance to nil
46 47 48 49 |
# File 'lib/fastly_nsq/producer.rb', line 46 def terminate connection.terminate @connection = nil end |
#write(message) ⇒ Nsq::Producer#pop
Write a message
65 66 67 68 |
# File 'lib/fastly_nsq/producer.rb', line 65 def write() raise FastlyNsq::NotConnectedError unless connected? connection.write(*) end |