Class: ActivePublisher::Async::RedisAdapter::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/active_publisher/async/redis_adapter/consumer.rb

Constant Summary collapse

SUPERVISOR_INTERVAL =
{
  :execution_interval => 10, # seconds
  :timeout_interval => 5, # seconds
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis_pool) ⇒ Consumer

Returns a new instance of Consumer.



14
15
16
17
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 14

def initialize(redis_pool)
  @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY)
  create_and_supervise_consumer!
end

Instance Attribute Details

#consumerObject (readonly)

Returns the value of attribute consumer.



12
13
14
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12

def consumer
  @consumer
end

#queueObject (readonly)

Returns the value of attribute queue.



12
13
14
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12

def queue
  @queue
end

#supervisorObject (readonly)

Returns the value of attribute supervisor.



12
13
14
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 12

def supervisor
  @supervisor
end

Instance Method Details

#create_and_supervise_consumer!Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 19

def create_and_supervise_consumer!
  @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)

  supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
    # This may also be the place to start additional publishers when we are getting backed up ... ?
    unless consumer.alive?
      consumer.kill rescue nil
      @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
    end

    # Notify the current queue size.
    ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size
  end

  supervisor_task.execute
end

#sizeObject



36
37
38
# File 'lib/active_publisher/async/redis_adapter/consumer.rb', line 36

def size
  queue.size
end