Class: Cuniculus::PubWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/cuniculus/pub_worker.rb

Overview

Each PubWorker maintains a background thread in a loop, fetching jobs reaching its job queue and publishing the payloads to RabbitMQ. They are not instantiated directly, but are rather created and managed by a Dispatcher.

Instance Method Summary collapse

Constructor Details

#initialize(config, job_queue, dispatcher_chan) ⇒ PubWorker

Returns a new instance of PubWorker.



10
11
12
13
14
15
16
# File 'lib/cuniculus/pub_worker.rb', line 10

def initialize(config, job_queue, dispatcher_chan)
  @config = config
  @job_queue = job_queue
  @dispatcher_chan = dispatcher_chan
  @mutex = Mutex.new
  @thread = nil
end

Instance Method Details

#alive?Boolean

Whether the background thread is running.

Returns:

  • (Boolean)


40
41
42
# File 'lib/cuniculus/pub_worker.rb', line 40

def alive?
  @thread&.alive? || false
end

#start!(conn) ⇒ Object

Declares exchanges, and starts a background thread that consumes and publishes messages.

If the connection to RabbitMQ it receives is not established, or if it fails to declare the exchanges, the background thread is not started and a message is sent to the dispatcher channel with the current timestamp. The dispatcher is then responsible for trying to set the connection up again and starting each of its workers.

Parameters:

  • conn (::Bunny::Session)

    Connection to RabbitMQ. Expected to be open at this stage.



26
27
28
29
30
31
32
33
34
35
# File 'lib/cuniculus/pub_worker.rb', line 26

def start!(conn)
  return @dispatcher_chan << Cuniculus.mark_time unless conn.open?

  @channel = sync { conn.create_channel }
  @x = sync { @channel.direct(Cuniculus::CUNICULUS_EXCHANGE, { durable: true }) }
  @dlx = sync { @channel.fanout(Cuniculus::CUNICULUS_DLX_EXCHANGE, { durable: true }) }
  @thread = Thread.new { run }
rescue Bunny::Exception
  @dispatcher_chan << Cuniculus.mark_time
end