Class: FastlyNsq::Listener
- Inherits:
-
Object
- Object
- FastlyNsq::Listener
- Extended by:
- Forwardable
- Includes:
- ListenerTesting
- Defined in:
- lib/fastly_nsq/listener.rb
Overview
The main interface to setting up a thread that listens for and processes NSQ messages from a given topic/channel.
Constant Summary collapse
- DEFAULT_PRIORITY =
Default queue priority used when setting up the consumer queue
0
- DEFAULT_CONNECTION_TIMEOUT =
Default NSQ connection timeout in seconds
5
Instance Attribute Summary collapse
-
#channel ⇒ String
readonly
NSQ Channel.
- #consumer ⇒ FastlyNsq::Consumer readonly
- #logger ⇒ Logger readonly
-
#max_attempts ⇒ Integer
readonly
Maxium number of times an NSQ message will be attempted.
- #preprocessor ⇒ Proc readonly
-
#priority ⇒ Integer
readonly
Queue priority.
-
#processor ⇒ Proc
readonly
Processor that is called for each message.
-
#topic ⇒ String
readonly
NSQ Topic.
Instance Method Summary collapse
-
#call(nsq_message) ⇒ FastlyNsq::Message
Process an NSQ message.
-
#connected? ⇒ FastlyNsq::Consumer#connected?
Delegated to
self.consumer
. -
#initialize(topic:, processor:, preprocessor: FastlyNsq.preprocessor, channel: FastlyNsq.channel, consumer: nil, logger: FastlyNsq.logger, priority: DEFAULT_PRIORITY, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **consumer_options) ⇒ Listener
constructor
Create a FastlyNsq::Listener.
-
#terminate ⇒ Object
Close the NSQ Conneciton.
Methods included from ListenerTesting
Constructor Details
#initialize(topic:, processor:, preprocessor: FastlyNsq.preprocessor, channel: FastlyNsq.channel, consumer: nil, logger: FastlyNsq.logger, priority: DEFAULT_PRIORITY, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **consumer_options) ⇒ Listener
Create a FastlyNsq::Listener
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/fastly_nsq/listener.rb', line 79 def initialize(topic:, processor:, preprocessor: FastlyNsq.preprocessor, channel: FastlyNsq.channel, consumer: nil, logger: FastlyNsq.logger, priority: DEFAULT_PRIORITY, connect_timeout: DEFAULT_CONNECTION_TIMEOUT, max_attempts: FastlyNsq.max_attempts, **) raise ArgumentError, "processor #{processor.inspect} does not respond to #call" unless processor.respond_to?(:call) raise ArgumentError, "priority #{priority.inspect} must be a Integer" unless priority.is_a?(Integer) @channel = channel @logger = logger @max_attempts = max_attempts @preprocessor = preprocessor @priority = priority @processor = processor @topic = topic @consumer = consumer || FastlyNsq::Consumer.new(topic: topic, connect_timeout: connect_timeout, channel: channel, queue: FastlyNsq::Feeder.new(self, priority), max_attempts: max_attempts, **) FastlyNsq.manager.add_listener(self) end |
Instance Attribute Details
#channel ⇒ String (readonly)
28 29 30 |
# File 'lib/fastly_nsq/listener.rb', line 28 def channel @channel end |
#consumer ⇒ FastlyNsq::Consumer (readonly)
31 32 33 |
# File 'lib/fastly_nsq/listener.rb', line 31 def consumer @consumer end |
#logger ⇒ Logger (readonly)
34 35 36 |
# File 'lib/fastly_nsq/listener.rb', line 34 def logger @logger end |
#max_attempts ⇒ Integer (readonly)
37 38 39 |
# File 'lib/fastly_nsq/listener.rb', line 37 def max_attempts @max_attempts end |
#preprocessor ⇒ Proc (readonly)
40 41 42 |
# File 'lib/fastly_nsq/listener.rb', line 40 def preprocessor @preprocessor end |
#priority ⇒ Integer (readonly)
46 47 48 |
# File 'lib/fastly_nsq/listener.rb', line 46 def priority @priority end |
#processor ⇒ Proc (readonly)
49 50 51 |
# File 'lib/fastly_nsq/listener.rb', line 49 def processor @processor end |
#topic ⇒ String (readonly)
43 44 45 |
# File 'lib/fastly_nsq/listener.rb', line 43 def topic @topic end |
Instance Method Details
#call(nsq_message) ⇒ FastlyNsq::Message
Process an NSQ message.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/fastly_nsq/listener.rb', line 113 def call() = FastlyNsq::Message.new() msg_info = { channel: channel, topic: topic, attempts: .attempts, id: Digest::MD5.hexdigest(.body.to_s), nsq_id: .id, metadata: . } logger.info do if logger.level == Logger::DEBUG msg_info.merge(data: .body) else msg_info end end class_name = processor.is_a?(Class) ? processor.name : processor.class.name FastlyNsq.tracer.trace_with_newrelic(params: msg_info, class_name: class_name) do preprocessor&.call() result = processor.call() .finish if result end end |
#connected? ⇒ FastlyNsq::Consumer#connected?
Delegated to self.consumer
25 |
# File 'lib/fastly_nsq/listener.rb', line 25 def_delegators :consumer, :connected? |
#terminate ⇒ Object
Close the NSQ Conneciton
148 149 150 151 152 |
# File 'lib/fastly_nsq/listener.rb', line 148 def terminate return unless connected? consumer.terminate logger.info "topic #{topic}, channel #{channel}: consumer terminated" end |