Class: FastlyNsq::Producer
- Inherits:
-
Object
- Object
- FastlyNsq::Producer
- Includes:
- ProducerTesting
- Defined in:
- lib/fastly_nsq/producer.rb
Overview
Provides an adapter to an Nsq::Producer and used to write messages to the queue.
Constant Summary collapse
- DEFAULT_CONNECTION_TIMEOUT =
seconds
5
Instance Attribute Summary collapse
-
#connect_timeout ⇒ Integer
readonly
Connection timeout in seconds.
- #connection ⇒ Nsq::Producer readonly
- #logger ⇒ Logger readonly
-
#topic ⇒ String
readonly
NSQ Topic.
Instance Method Summary collapse
-
#connect ⇒ Boolean
Create an Nsq::Producer and set as @connection instance variable.
-
#connected? ⇒ Nsq::Consumer#connected?
Check conenction status.
-
#initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT) ⇒ Producer
constructor
Create a FastlyNsq::Producer.
-
#terminate ⇒ Nsq::Producer#terminate
Terminate the NSQ connection and set connection instance to
nil
. -
#write(message) ⇒ Nsq::Producer#pop
Write a message.
Methods included from ProducerTesting
Constructor Details
#initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT) ⇒ Producer
Create a FastlyNsq::Producer
Will connect to NSQDs in this priority: 1. direct from FastlyNsq.producer_nsqds 2. discovered via FastlyNsq.lookupd_http_addresses. If both ‘producer_nsqds` and `lookupd_http_addresses` are set only the FastlyNsq.producer_nsqds will be used.
36 37 38 39 40 41 42 43 |
# File 'lib/fastly_nsq/producer.rb', line 36 def initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT) @topic = topic = FastlyNsq::TlsOptions.as_hash() @connect_timeout = connect_timeout @logger = logger connect end |
Instance Attribute Details
#connect_timeout ⇒ Integer (readonly)
Returns connection timeout in seconds.
19 20 21 |
# File 'lib/fastly_nsq/producer.rb', line 19 def connect_timeout @connect_timeout end |
#connection ⇒ Nsq::Producer (readonly)
16 17 18 |
# File 'lib/fastly_nsq/producer.rb', line 16 def connection @connection end |
#logger ⇒ Logger (readonly)
22 23 24 |
# File 'lib/fastly_nsq/producer.rb', line 22 def logger @logger end |
#topic ⇒ String (readonly)
Returns NSQ Topic.
13 14 15 |
# File 'lib/fastly_nsq/producer.rb', line 13 def topic @topic end |
Instance Method Details
#connect ⇒ Boolean
Create an Nsq::Producer and set as @connection instance variable
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/fastly_nsq/producer.rb', line 76 def connect producers = FastlyNsq.producer_nsqds lookupd = FastlyNsq.lookupd_http_addresses opts = .merge(topic: topic) if !producers.empty? opts[:nsqd] = producers elsif !lookupd.empty? opts[:nsqlookupd] = lookupd else raise FastlyNsq::ConnectionFailed, "One of FastlyNsq.producer_nsqds or FastlyNsq.lookupd_http_addresses must be present" end @connection ||= Nsq::Producer.new(opts) timeout_args = [ connect_timeout, FastlyNsq::ConnectionFailed, "Failed connection to #{opts[:nsqd] || opts[:nsqlookupd]} within #{connect_timeout} seconds" ] Timeout.timeout(*timeout_args) { Thread.pass until connection.connected? } true rescue FastlyNsq::ConnectionFailed logger.error { "Producer for #{topic} failed to connect!" } terminate if @connection raise end |
#connected? ⇒ Nsq::Consumer#connected?
Check conenction status
58 59 60 61 62 |
# File 'lib/fastly_nsq/producer.rb', line 58 def connected? return false unless connection connection.connected? end |
#terminate ⇒ Nsq::Producer#terminate
Terminate the NSQ connection and set connection instance to nil
49 50 51 52 |
# File 'lib/fastly_nsq/producer.rb', line 49 def terminate connection.terminate @connection = nil end |
#write(message) ⇒ Nsq::Producer#pop
Write a message
68 69 70 71 |
# File 'lib/fastly_nsq/producer.rb', line 68 def write() raise FastlyNsq::NotConnectedError unless connected? connection.write(*) end |