Class: ActivePublisher::Async::RedisAdapter::Consumer
- Inherits:
-
Object
- Object
- ActivePublisher::Async::RedisAdapter::Consumer
- Defined in:
- lib/active_publisher/async/redis_adapter/consumer.rb
Constant Summary collapse
- SUPERVISOR_INTERVAL =
{ :execution_interval => 10, # seconds :timeout_interval => 5, # seconds }
Instance Attribute Summary collapse
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#supervisor ⇒ Object
readonly
Returns the value of attribute supervisor.
Instance Method Summary collapse
- #create_and_supervise_consumer! ⇒ Object
-
#initialize(redis_pool) ⇒ Consumer
constructor
A new instance of Consumer.
- #size ⇒ Object
Constructor Details
#initialize(redis_pool) ⇒ Consumer
Returns a new instance of Consumer.
14 15 16 17 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 14 def initialize(redis_pool) @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY) create_and_supervise_consumer! end |
Instance Attribute Details
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
12 13 14 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12 def consumer @consumer end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
12 13 14 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12 def queue @queue end |
#supervisor ⇒ Object (readonly)
Returns the value of attribute supervisor.
12 13 14 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12 def supervisor @supervisor end |
Instance Method Details
#create_and_supervise_consumer! ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 19 def create_and_supervise_consumer! @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do # This may also be the place to start additional publishers when we are getting backed up ... ? unless consumer.alive? consumer.kill rescue nil @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) end # Notify the current queue size. ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size end supervisor_task.execute end |
#size ⇒ Object
36 37 38 |
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 36 def size queue.size end |