Class: FastlyNsq::Consumer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
ConsumerTesting
Defined in:
lib/fastly_nsq/consumer.rb

Overview

Provides an adapter to an Nsq::Consumer and used to read messages off the queue.

Examples:

consumer = FastlyNsq::Consumer.new(
  topic: 'topic',
  channel: 'channel'
)
consumer.size #=> 1
message = consumer.pop
message.body #=> "{ 'data': { 'key': 'value' } }"
message.finish
consumer.size #=> 0
consumer.terminate

Constant Summary collapse

DEFAULT_CONNECTION_TIMEOUT =

Default NSQ connection timeout in seconds

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ConsumerTesting

#messages, prepended, #terminated?

Constructor Details

#initialize(topic:, channel:, queue: nil, tls_options: nil, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **options) ⇒ Consumer

Create a FastlyNsq::Consumer

Will connect to NSQDs in this priority: 1. direct from FastlyNsq.consumer_nsqds 2. discovered via FastlyNsq.lookupd_http_addresses. If both ‘consumer_nsqds` and `lookupd_http_addresses` are set only the FastlyNsq.consumer_nsqds will be used.

Examples:

consumer = FastlyNsq::Consumer.new(
  topic: 'topic',
  channel: 'channel'
)

Parameters:

  • topic (String)

    NSQ topic from which to consume

  • channel (String)

    NSQ channel from which to consume

  • queue (#pop, #size) (defaults to: nil)

    Queue object, most likely an instance of Feeder

  • tls_options (Hash) (defaults to: nil)

    Hash of TSL options passed the connection. In most cases this should be nil unless you need to override the default values set in ENV.

  • connect_timeout (Integer) (defaults to: DEFAULT_CONNECTION_TIMEOUT)

    NSQ connection timeout in seconds

  • max_attempts (Integer) (defaults to: FastlyNsq.max_attempts)

    maximum number of times an NSQ message will be attemped When set to nil, attempts will be unlimited

  • options (Hash)

    addtional options forwarded to the connection contructor



83
84
85
86
87
88
89
90
91
# File 'lib/fastly_nsq/consumer.rb', line 83

def initialize(topic:, channel:, queue: nil, tls_options: nil, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **options)
  @topic = topic
  @channel = channel
  @tls_options = FastlyNsq::TlsOptions.as_hash(tls_options)
  @connect_timeout = connect_timeout
  @max_attempts = max_attempts

  @connection = connect(queue, **options)
end

Instance Attribute Details

#channelString (readonly)

Returns NSQ Channel.

Returns:

  • (String)

    NSQ Channel



25
26
27
# File 'lib/fastly_nsq/consumer.rb', line 25

def channel
  @channel
end

#connect_timeoutInteger (readonly)

Returns connection timeout in seconds.

Returns:

  • (Integer)

    connection timeout in seconds



34
35
36
# File 'lib/fastly_nsq/consumer.rb', line 34

def connect_timeout
  @connect_timeout
end

#connectionNsq::Consumer (readonly)

Returns:

  • (Nsq::Consumer)


31
32
33
# File 'lib/fastly_nsq/consumer.rb', line 31

def connection
  @connection
end

#max_attemptsInteger (readonly)

Returns maximum number of times an NSQ message will be attempted.

Returns:

  • (Integer)

    maximum number of times an NSQ message will be attempted



37
38
39
# File 'lib/fastly_nsq/consumer.rb', line 37

def max_attempts
  @max_attempts
end

#topicString (readonly)

Returns NSQ Topic.

Returns:

  • (String)

    NSQ Topic



28
29
30
# File 'lib/fastly_nsq/consumer.rb', line 28

def topic
  @topic
end

Instance Method Details

#connected?Nsq::Consumer#connected?

Delegated to self.connection

Returns:

  • (Nsq::Consumer#connected?)

See Also:



59
# File 'lib/fastly_nsq/consumer.rb', line 59

def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate

#empty?Boolean

Is the message queue empty?

Returns:

  • (Boolean)


96
97
98
# File 'lib/fastly_nsq/consumer.rb', line 96

def empty?
  size.zero?
end

#popNsq::Consumer#pop

Delegated to self.connection

Returns:

  • (Nsq::Consumer#pop)

See Also:



59
# File 'lib/fastly_nsq/consumer.rb', line 59

def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate

#pop_without_blockingNsq::Consumer#pop_without_blocking

Delegated to self.connection

Returns:

See Also:



59
# File 'lib/fastly_nsq/consumer.rb', line 59

def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate

#sizeNsq::Consumer#size

Delegated to self.connection

Returns:

  • (Nsq::Consumer#size)

See Also:



59
# File 'lib/fastly_nsq/consumer.rb', line 59

def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate

#terminateNsq::Consumer#terminate

Delegated to self.connection

Returns:

See Also:



59
# File 'lib/fastly_nsq/consumer.rb', line 59

def_delegators :connection, :connected?, :pop, :pop_without_blocking, :size, :terminate