Class: FastlyNsq::Listener

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
ListenerTesting
Defined in:
lib/fastly_nsq/listener.rb

Overview

The main interface to setting up a thread that listens for and processes NSQ messages from a given topic/channel.

Examples:

FastlyNsq::Listener.new(
  topic: topic,
  channel: channel,
  processor: ->(m) { puts 'got: '+ m.body }
)

Constant Summary collapse

DEFAULT_PRIORITY =

Default queue priority used when setting up the consumer queue

0
DEFAULT_CONNECTION_TIMEOUT =

Default NSQ connection timeout in seconds

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ListenerTesting

#drain, #messages, prepended

Constructor Details

#initialize(topic:, processor:, preprocessor: FastlyNsq.preprocessor, channel: FastlyNsq.channel, consumer: nil, logger: FastlyNsq.logger, priority: DEFAULT_PRIORITY, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **consumer_options) ⇒ Listener

Create a FastlyNsq::Listener

Examples:

FastlyNsq::Listener.new(
  topic: topic,
  channel: channel,
  processor: MessageProcessor,
  max_attempts: 15,
)

Parameters:

  • topic (String)

    NSQ topic on which to listen

  • processor (Proc#call)

    Any object that responds to call. Each message will be processed with processor.call(FastlyNsq::Message.new(nsq_message)). The processor should return true to indicate that processing is complete and NSQ message can be finished. The processor is passed an instance of Message so the provided Proc can optionally manage the message state using methods provided by Message.

  • preprocessor (Proc#call) (defaults to: FastlyNsq.preprocessor)

    Any object that responds to call. Similar to the processor each message it processes via preprocessor.call(message). Default: FastlyNsq.preprocessor

  • channel (String) (defaults to: FastlyNsq.channel)

    NSQ Channel on which to listen. Default: FastlyNsq.channel

  • consumer (FastlyNsq::Consumer) (defaults to: nil)

    interface to read messages off the queue. If value is nil the constructor will create a Consumer based on the provided parameters.

  • logger (Logger) (defaults to: FastlyNsq.logger)

    Default: FastlyNsq.logger

  • priority (Integer) (defaults to: DEFAULT_PRIORITY)

    Queue piority. Default: DEFAULT_PRIORITY

  • connect_timeout (Integer) (defaults to: DEFAULT_CONNECTION_TIMEOUT)

    NSQ connection timeout in seconds. Default: DEFAULT_CONNECTION_TIMEOUT

  • max_attempts (Integer) (defaults to: FastlyNsq.max_attempts)

    maximum number of times an NSQ message will be attemped Default: FastlyNsq.max_attempts When set to nil, attempts will be unlimited

  • consumer_options (Hash)

    additional options forwarded to the Consumer} contructor

Raises:

  • (ArgumentError)


79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fastly_nsq/listener.rb', line 79

def initialize(topic:, processor:, preprocessor: FastlyNsq.preprocessor, channel: FastlyNsq.channel, consumer: nil,
  logger: FastlyNsq.logger, priority: DEFAULT_PRIORITY, connect_timeout: DEFAULT_CONNECTION_TIMEOUT,
  max_attempts: FastlyNsq.max_attempts, **consumer_options)

  raise ArgumentError, "processor #{processor.inspect} does not respond to #call" unless processor.respond_to?(:call)
  raise ArgumentError, "priority #{priority.inspect} must be a Integer" unless priority.is_a?(Integer)

  @channel = channel
  @logger = logger
  @max_attempts = max_attempts
  @preprocessor = preprocessor
  @priority = priority
  @processor = processor
  @topic = topic

  @consumer = consumer || FastlyNsq::Consumer.new(topic: topic,
    connect_timeout: connect_timeout,
    channel: channel,
    queue: FastlyNsq::Feeder.new(self, priority),
    max_attempts: max_attempts,
                                                  **consumer_options)

  FastlyNsq.manager.add_listener(self)
end

Instance Attribute Details

#channelString (readonly)

Returns NSQ Channel.

Returns:

  • (String)

    NSQ Channel



28
29
30
# File 'lib/fastly_nsq/listener.rb', line 28

def channel
  @channel
end

#consumerFastlyNsq::Consumer (readonly)

Returns:



31
32
33
# File 'lib/fastly_nsq/listener.rb', line 31

def consumer
  @consumer
end

#loggerLogger (readonly)

Returns:

  • (Logger)


34
35
36
# File 'lib/fastly_nsq/listener.rb', line 34

def logger
  @logger
end

#max_attemptsInteger (readonly)

Returns maxium number of times an NSQ message will be attempted.

Returns:

  • (Integer)

    maxium number of times an NSQ message will be attempted



37
38
39
# File 'lib/fastly_nsq/listener.rb', line 37

def max_attempts
  @max_attempts
end

#preprocessorProc (readonly)

Returns:

  • (Proc)


40
41
42
# File 'lib/fastly_nsq/listener.rb', line 40

def preprocessor
  @preprocessor
end

#priorityInteger (readonly)

Returns Queue priority.

Returns:

  • (Integer)

    Queue priority



46
47
48
# File 'lib/fastly_nsq/listener.rb', line 46

def priority
  @priority
end

#processorProc (readonly)

Returns processor that is called for each message.

Returns:

  • (Proc)

    processor that is called for each message



49
50
51
# File 'lib/fastly_nsq/listener.rb', line 49

def processor
  @processor
end

#topicString (readonly)

Returns NSQ Topic.

Returns:

  • (String)

    NSQ Topic



43
44
45
# File 'lib/fastly_nsq/listener.rb', line 43

def topic
  @topic
end

Instance Method Details

#call(nsq_message) ⇒ FastlyNsq::Message

Process an NSQ message.

Parameters:

Returns:

See Also:



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/fastly_nsq/listener.rb', line 113

def call(nsq_message)
  message = FastlyNsq::Message.new(nsq_message)

  msg_info = {
    channel: channel,
    topic: topic,
    attempts: message.attempts,
    id: Digest::MD5.hexdigest(nsq_message.body.to_s),
    nsq_id: message.id,
    metadata: message.meta
  }

  logger.info do
    if logger.level == Logger::DEBUG
      msg_info.merge(data: message.body)
    else
      msg_info
    end
  end

  class_name = processor.is_a?(Class) ? processor.name : processor.class.name

  FastlyNsq.tracer.trace_with_newrelic(params: msg_info, class_name: class_name) do
    preprocessor&.call(message)
    result = processor.call(message)
    message.finish if result
  end

  message
end

#connected?FastlyNsq::Consumer#connected?

Delegated to self.consumer

Returns:



25
# File 'lib/fastly_nsq/listener.rb', line 25

def_delegators :consumer, :connected?

#terminateObject

Close the NSQ Conneciton

See Also:



148
149
150
151
152
# File 'lib/fastly_nsq/listener.rb', line 148

def terminate
  return unless connected?
  consumer.terminate
  logger.info "topic #{topic}, channel #{channel}: consumer terminated"
end