Class: Proletariat::Subscriber

Inherits:
Actor
  • Object
show all
Includes:
Concerns::Logging
Defined in:
lib/proletariat/subscriber.rb

Overview

Internal: Creates, binds and listens on a RabbitMQ queue. Forwards

messages to a given listener.

Defined Under Namespace

Classes: Acknowledger

Instance Method Summary collapse

Methods included from Concerns::Logging

#log_info

Methods inherited from Actor

#on_message

Methods included from ActorCommon

included, #on_event

Constructor Details

#initialize(listener, queue_config, exception_handler_class) ⇒ Subscriber

Public: Creates a new Subscriber instance.

listener - Object to delegate new messages to. queue_config - A QueueConfig value object.



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/proletariat/subscriber.rb', line 11

def initialize(listener, queue_config, exception_handler_class)
  @listener                = listener
  @queue_config            = queue_config
  @exception_handler_class = exception_handler_class

  bind_queue
  start_consumer
  exception_handler

  @ticker = Concurrent::TimerTask.execute(execution: 5, timeout: 2) do
    acknowledge_messages
  end
end

Instance Method Details

#cleanupObject

Internal: Called on actor termination. Used to stop consumption off the

queue and end the ticker.

Returns nil.



29
30
31
32
33
34
35
# File 'lib/proletariat/subscriber.rb', line 29

def cleanup
  @ticker.kill   if @ticker
  stop_consumer  if @consumer
  @channel.close if @channel && channel.open?

  nil
end