Class: FastlyNsq::Feeder
- Inherits:
-
Object
- Object
- FastlyNsq::Feeder
- Defined in:
- lib/fastly_nsq/feeder.rb
Overview
FastlyNsq::Feeder is a queue interface wrapper for the manager’s thread pool. This allows a consumer read loop to post a message directly to a processor (FastlyNsq::Listener) with a specified priority.
Instance Attribute Summary collapse
-
#priority ⇒ Object
readonly
Returns the value of attribute priority.
-
#processor ⇒ Object
readonly
Returns the value of attribute processor.
Instance Method Summary collapse
-
#initialize(processor, priority) ⇒ Feeder
constructor
Create a FastlyNsq::Feeder.
-
#push(message) ⇒ Object
Send a message to the processor with specified priority.
Constructor Details
#initialize(processor, priority) ⇒ Feeder
Create a FastlyNsq::Feeder
14 15 16 17 |
# File 'lib/fastly_nsq/feeder.rb', line 14 def initialize(processor, priority) @processor = processor @priority = priority end |
Instance Attribute Details
#priority ⇒ Object (readonly)
Returns the value of attribute priority.
8 9 10 |
# File 'lib/fastly_nsq/feeder.rb', line 8 def priority @priority end |
#processor ⇒ Object (readonly)
Returns the value of attribute processor.
8 9 10 |
# File 'lib/fastly_nsq/feeder.rb', line 8 def processor @processor end |
Instance Method Details
#push(message) ⇒ Object
Send a message to the processor with specified priority
This will post
to the FastlyNsq.manager.pool with a queue priority and block that will called. FastlyNsq.manager.pool is a PriorityThreadPool which is a Concurrent::ThreadPoolExecutor that has @queue which in turn is a priority queue that manages job priority
The ThreadPoolExecutor is what actually works the @queue and sends call
to the queued Proc. When that code is exec’ed processer.call(message) is run. Processor in this context is a FastlyNsq::Listener
The block also will log exceptions here because Concurrent::ThreadPoolExecutor will swallow the exception.
37 38 39 40 41 42 43 44 45 |
# File 'lib/fastly_nsq/feeder.rb', line 37 def push() FastlyNsq.manager.pool.post(priority) do processor.call() rescue => ex FastlyNsq.logger.error ex FastlyNsq.tracer.notice_error ex raise ex end end |