Class: FastlyNsq::Producer

Inherits:
Object
  • Object
show all
Includes:
ProducerTesting
Defined in:
lib/fastly_nsq/producer.rb

Overview

Provides an adapter to an Nsq::Producer and used to write messages to the queue.

Examples:

producer = FastlyNsq::Producer.new(topic: 'topic)
producer.write('my message')

Constant Summary collapse

DEFAULT_CONNECTION_TIMEOUT =

seconds

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ProducerTesting

#messages

Constructor Details

#initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT) ⇒ Producer

Create a FastlyNsq::Producer

Will connect to NSQDs in this priority: 1. direct from FastlyNsq.producer_nsqds 2. discovered via FastlyNsq.lookupd_http_addresses. If both ‘producer_nsqds` and `lookupd_http_addresses` are set only the FastlyNsq.producer_nsqds will be used.

Parameters:

  • topic (String)

    NSQ topic on which to deliver the message

  • tls_options (Hash) (defaults to: nil)

    Hash of TSL options passed the connection. In most cases this should be nil unless you need to override the default values set in ENV.

  • logger (Logger) (defaults to: FastlyNsq.logger)

    defaults to FastlyNsq.logger

  • connect_timeout (Integer) (defaults to: DEFAULT_CONNECTION_TIMEOUT)

    NSQ connection timeout in seconds



36
37
38
39
40
41
42
43
# File 'lib/fastly_nsq/producer.rb', line 36

def initialize(topic:, tls_options: nil, logger: FastlyNsq.logger, connect_timeout: DEFAULT_CONNECTION_TIMEOUT)
  @topic = topic
  @tls_options = FastlyNsq::TlsOptions.as_hash(tls_options)
  @connect_timeout = connect_timeout
  @logger = logger

  connect
end

Instance Attribute Details

#connect_timeoutInteger (readonly)

Returns connection timeout in seconds.

Returns:

  • (Integer)

    connection timeout in seconds



19
20
21
# File 'lib/fastly_nsq/producer.rb', line 19

def connect_timeout
  @connect_timeout
end

#connectionNsq::Producer (readonly)

Returns:

  • (Nsq::Producer)


16
17
18
# File 'lib/fastly_nsq/producer.rb', line 16

def connection
  @connection
end

#loggerLogger (readonly)

Returns:

  • (Logger)


22
23
24
# File 'lib/fastly_nsq/producer.rb', line 22

def logger
  @logger
end

#topicString (readonly)

Returns NSQ Topic.

Returns:

  • (String)

    NSQ Topic



13
14
15
# File 'lib/fastly_nsq/producer.rb', line 13

def topic
  @topic
end

Instance Method Details

#connectBoolean

Create an Nsq::Producer and set as @connection instance variable

Returns:

  • (Boolean)


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fastly_nsq/producer.rb', line 76

def connect
  producers = FastlyNsq.producer_nsqds
  lookupd = FastlyNsq.lookupd_http_addresses

  opts = tls_options.merge(topic: topic)

  if !producers.empty?
    opts[:nsqd] = producers
  elsif !lookupd.empty?
    opts[:nsqlookupd] = lookupd
  else
    raise FastlyNsq::ConnectionFailed, "One of FastlyNsq.producer_nsqds or FastlyNsq.lookupd_http_addresses must be present"
  end

  @connection ||= Nsq::Producer.new(opts)

  timeout_args = [
    connect_timeout,
    FastlyNsq::ConnectionFailed,
    "Failed connection to #{opts[:nsqd] || opts[:nsqlookupd]} within #{connect_timeout} seconds"
  ]

  Timeout.timeout(*timeout_args) { Thread.pass until connection.connected? }

  true
rescue FastlyNsq::ConnectionFailed
  logger.error { "Producer for #{topic} failed to connect!" }
  terminate if @connection
  raise
end

#connected?Nsq::Consumer#connected?

Check conenction status

Returns:

  • (Nsq::Consumer#connected?)

See Also:



58
59
60
61
62
# File 'lib/fastly_nsq/producer.rb', line 58

def connected?
  return false unless connection

  connection.connected?
end

#terminateNsq::Producer#terminate

Terminate the NSQ connection and set connection instance to nil

Returns:

See Also:



49
50
51
52
# File 'lib/fastly_nsq/producer.rb', line 49

def terminate
  connection.terminate
  @connection = nil
end

#write(message) ⇒ Nsq::Producer#pop

Write a message

Returns:

  • (Nsq::Producer#pop)

Raises:

See Also:



68
69
70
71
# File 'lib/fastly_nsq/producer.rb', line 68

def write(message)
  raise FastlyNsq::NotConnectedError unless connected?
  connection.write(*message)
end