Class: FastlyNsq::Feeder

Inherits:
Object
  • Object
show all
Defined in:
lib/fastly_nsq/feeder.rb

Overview

FastlyNsq::Feeder is a queue interface wrapper for the manager’s thread pool. This allows a consumer read loop to post a message directly to a processor (FastlyNsq::Listener) with a specified priority.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(processor, priority) ⇒ Feeder

Create a FastlyNsq::Feeder

Parameters:



14
15
16
17
# File 'lib/fastly_nsq/feeder.rb', line 14

def initialize(processor, priority)
  @processor = processor
  @priority = priority
end

Instance Attribute Details

#priorityObject (readonly)

Returns the value of attribute priority.



8
9
10
# File 'lib/fastly_nsq/feeder.rb', line 8

def priority
  @priority
end

#processorObject (readonly)

Returns the value of attribute processor.



8
9
10
# File 'lib/fastly_nsq/feeder.rb', line 8

def processor
  @processor
end

Instance Method Details

#push(message) ⇒ Object

Send a message to the processor with specified priority

This will post to the FastlyNsq.manager.pool with a queue priority and block that will called. FastlyNsq.manager.pool is a PriorityThreadPool which is a Concurrent::ThreadPoolExecutor that has @queue which in turn is a priority queue that manages job priority

The ThreadPoolExecutor is what actually works the @queue and sends call to the queued Proc. When that code is exec’ed processer.call(message) is run. Processor in this context is a FastlyNsq::Listener

The block also will log exceptions here because Concurrent::ThreadPoolExecutor will swallow the exception.



37
38
39
40
41
42
43
44
45
# File 'lib/fastly_nsq/feeder.rb', line 37

def push(message)
  FastlyNsq.manager.pool.post(priority) do
    processor.call(message)
  rescue => ex
    FastlyNsq.logger.error ex
    FastlyNsq.tracer.notice_error ex
    raise ex
  end
end