Class: FastlyNsq::Listener

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
ListenerTesting
Defined in:
lib/fastly_nsq/listener.rb

Overview

The main interface to setting up a thread that listens for and processes NSQ messages from a given topic/channel.

Examples:

FastlyNsq::Listener.new(
  topic: topic,
  channel: channel,
  processor: ->(m) { puts 'got: '+ m.body }
)

Constant Summary collapse

DEFAULT_PRIORITY =

Default queue priority used when setting up the consumer queue

0
DEFAULT_CONNECTION_TIMEOUT =

Default NSQ connection timeout in seconds

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ListenerTesting

#drain, #messages, prepended

Constructor Details

#initialize(topic:, processor:, preprocessor: FastlyNsq.preprocessor, channel: FastlyNsq.channel, consumer: nil, logger: FastlyNsq.logger, priority: DEFAULT_PRIORITY, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **consumer_options) ⇒ Listener

Create a FastlyNsq::Listener

Examples:

FastlyNsq::Listener.new(
  topic: topic,
  channel: channel,
  processor: MessageProcessor,
  max_attempts: 15,
)

Raises:

  • (ArgumentError)


79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fastly_nsq/listener.rb', line 79

def initialize(topic:, processor:, preprocessor: FastlyNsq.preprocessor, channel: FastlyNsq.channel, consumer: nil,
  logger: FastlyNsq.logger, priority: DEFAULT_PRIORITY, connect_timeout: DEFAULT_CONNECTION_TIMEOUT,
  max_attempts: FastlyNsq.max_attempts, **consumer_options)

  raise ArgumentError, "processor #{processor.inspect} does not respond to #call" unless processor.respond_to?(:call)
  raise ArgumentError, "priority #{priority.inspect} must be a Integer" unless priority.is_a?(Integer)

  @channel = channel
  @logger = logger
  @max_attempts = max_attempts
  @preprocessor = preprocessor
  @priority = priority
  @processor = processor
  @topic = topic

  @consumer = consumer || FastlyNsq::Consumer.new(topic: topic,
    connect_timeout: connect_timeout,
    channel: channel,
    queue: FastlyNsq::Feeder.new(self, priority),
    max_attempts: max_attempts,
                                                  **consumer_options)

  FastlyNsq.manager.add_listener(self)
end

Instance Attribute Details

#channelString (readonly)



28
29
30
# File 'lib/fastly_nsq/listener.rb', line 28

def channel
  @channel
end

#consumerFastlyNsq::Consumer (readonly)



31
32
33
# File 'lib/fastly_nsq/listener.rb', line 31

def consumer
  @consumer
end

#loggerLogger (readonly)



34
35
36
# File 'lib/fastly_nsq/listener.rb', line 34

def logger
  @logger
end

#max_attemptsInteger (readonly)



37
38
39
# File 'lib/fastly_nsq/listener.rb', line 37

def max_attempts
  @max_attempts
end

#preprocessorProc (readonly)



40
41
42
# File 'lib/fastly_nsq/listener.rb', line 40

def preprocessor
  @preprocessor
end

#priorityInteger (readonly)



46
47
48
# File 'lib/fastly_nsq/listener.rb', line 46

def priority
  @priority
end

#processorProc (readonly)



49
50
51
# File 'lib/fastly_nsq/listener.rb', line 49

def processor
  @processor
end

#topicString (readonly)



43
44
45
# File 'lib/fastly_nsq/listener.rb', line 43

def topic
  @topic
end

Instance Method Details

#call(nsq_message) ⇒ FastlyNsq::Message

Process an NSQ message.

See Also:



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/fastly_nsq/listener.rb', line 113

def call(nsq_message)
  message = FastlyNsq::Message.new(nsq_message)

  msg_info = {
    channel: channel,
    topic: topic,
    attempts: message.attempts,
    id: Digest::MD5.hexdigest(nsq_message.body.to_s),
    nsq_id: message.id,
    metadata: message.meta
  }

  logger.info do
    if logger.level == Logger::DEBUG
      msg_info.merge(data: message.body)
    else
      msg_info
    end
  end

  class_name = processor.is_a?(Class) ? processor.name : processor.class.name

  FastlyNsq.tracer.trace_with_newrelic(params: msg_info, class_name: class_name) do
    preprocessor&.call(message)
    result = processor.call(message)
    message.finish if result
  end

  message
end

#connected?FastlyNsq::Consumer#connected?

Delegated to self.consumer



25
# File 'lib/fastly_nsq/listener.rb', line 25

def_delegators :consumer, :connected?

#terminateObject

Close the NSQ Conneciton

See Also:



148
149
150
151
152
# File 'lib/fastly_nsq/listener.rb', line 148

def terminate
  return unless connected?
  consumer.terminate
  logger.info "topic #{topic}, channel #{channel}: consumer terminated"
end