Class: FastlyNsq::Producer

Inherits:
Object
  • Object
show all
Includes:
ProducerTesting
Defined in:
lib/fastly_nsq/producer.rb

Overview

Provides an adapter to an Nsq::Producer and used to write messages to the queue.

Examples:

producer = FastlyNsq::Producer.new(topic: 'topic)
producer.write('my message')

Constant Summary collapse

DEFAULT_CONNECTION_TIMEOUT =

seconds

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ProducerTesting

#messages

Constructor Details

#initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT) ⇒ Producer

Create a FastlyNsq::Producer



33
34
35
36
37
38
39
40
# File 'lib/fastly_nsq/producer.rb', line 33

def initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT)
  @topic           = topic
  @tls_options     = FastlyNsq::TlsOptions.as_hash(tls_options)
  @connect_timeout = connect_timeout
  @logger          = logger

  connect
end

Instance Attribute Details

#connect_timeoutInteger (readonly)



19
20
21
# File 'lib/fastly_nsq/producer.rb', line 19

def connect_timeout
  @connect_timeout
end

#connectionNsq::Producer (readonly)



16
17
18
# File 'lib/fastly_nsq/producer.rb', line 16

def connection
  @connection
end

#loggerLogger (readonly)



22
23
24
# File 'lib/fastly_nsq/producer.rb', line 22

def logger
  @logger
end

#topicString (readonly)



13
14
15
# File 'lib/fastly_nsq/producer.rb', line 13

def topic
  @topic
end

Instance Method Details

#connectBoolean

Create an Nsq::Producer and set as @connection instance variable



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fastly_nsq/producer.rb', line 73

def connect
  lookupd = FastlyNsq.lookupd_http_addresses

  @connection ||= Nsq::Producer.new(
    tls_options.merge(
      nsqlookupd:  lookupd,
      topic:       topic,
    ),
  )

  timeout_args = [connect_timeout, FastlyNsq::ConnectionFailed]

  if RUBY_VERSION > '2.4.0'
    timeout_args << "Failed connection to #{lookupd} within #{connect_timeout} seconds"
  end

  Timeout.timeout(*timeout_args) { Thread.pass until connection.connected? }

  true
rescue FastlyNsq::ConnectionFailed
  logger.error { "Producer for #{topic} failed to connect!" }
  terminate
  raise
end

#connected?Nsq::Consumer#connected?

Check conenction status



55
56
57
58
59
# File 'lib/fastly_nsq/producer.rb', line 55

def connected?
  return false unless connection

  connection.connected?
end

#terminateNsq::Producer#terminate

Terminate the NSQ connection and set connection instance to nil



46
47
48
49
# File 'lib/fastly_nsq/producer.rb', line 46

def terminate
  connection.terminate
  @connection = nil
end

#write(message) ⇒ Nsq::Producer#pop

Write a message



65
66
67
68
# File 'lib/fastly_nsq/producer.rb', line 65

def write(message)
  raise FastlyNsq::NotConnectedError unless connected?
  connection.write(*message)
end