Class: Cuniculus::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/cuniculus/dispatcher.rb

Overview

The dispatcher forwards jobs to a worker pool to be published to RabbitMQ. It holds a RabbitMQ session and, when it receives information from one of its workers that a network exception occurred, tries to reestablish the connection and restarts the pool.

The dispatcher background thread, which monitors for connection errors, is started whenever the first job is enqueued by a Worker.

Constant Summary collapse

ENFORCED_CONN_OPTS =
{
  threaded: false, # No need for a reader thread, since this connection is only used for publishing
  automatically_recover: false,
  logger: ::Logger.new(IO::NULL)
}.freeze
RECOVERABLE_ERRORS =
[AMQ::Protocol::Error, ::Bunny::Exception, Errno::ECONNRESET].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Dispatcher

Instantiates a dispatcher using the passed Config.

Parameters:



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/cuniculus/dispatcher.rb', line 27

def initialize(config)
  @config = config
  @conn = nil
  @job_queue = Queue.new
  @dispatcher_chan = Queue.new
  @shutdown = false
  @workers = config.pub_pool_size.times.map do |i|
    Cuniculus::PubWorker.new(config, @job_queue, @dispatcher_chan)
  end
  @reconnect_attempts = config.pub_reconnect_attempts
  @reconnect_delay = config.pub_reconnect_delay
  @reconnect_delay_max = config.pub_reconnect_delay_max
  @shutdown_grace_period = config.pub_shutdown_grace_period
  @thread = nil
  @shutdown = false
end

Instance Attribute Details

#dispatcher_chanObject (readonly)

Returns the value of attribute dispatcher_chan.



22
23
24
# File 'lib/cuniculus/dispatcher.rb', line 22

def dispatcher_chan
  @dispatcher_chan
end

#job_queueObject (readonly)

Returns the value of attribute job_queue.



22
23
24
# File 'lib/cuniculus/dispatcher.rb', line 22

def job_queue
  @job_queue
end

#reconnect_attemptsObject (readonly)

Returns the value of attribute reconnect_attempts.



22
23
24
# File 'lib/cuniculus/dispatcher.rb', line 22

def reconnect_attempts
  @reconnect_attempts
end

#reconnect_delayObject (readonly)

Returns the value of attribute reconnect_delay.



22
23
24
# File 'lib/cuniculus/dispatcher.rb', line 22

def reconnect_delay
  @reconnect_delay
end

#reconnect_delay_maxObject (readonly)

Returns the value of attribute reconnect_delay_max.



22
23
24
# File 'lib/cuniculus/dispatcher.rb', line 22

def reconnect_delay_max
  @reconnect_delay_max
end

#shutdown_grace_periodObject (readonly)

Returns the value of attribute shutdown_grace_period.



22
23
24
# File 'lib/cuniculus/dispatcher.rb', line 22

def shutdown_grace_period
  @shutdown_grace_period
end

Instance Method Details

#alive?Boolean

Whether its background thread is running.

Returns:

  • (Boolean)


85
86
87
# File 'lib/cuniculus/dispatcher.rb', line 85

def alive?
  @thread&.alive? || false
end

#describe(log_level = Logger::DEBUG) ⇒ Object



44
45
46
47
48
49
# File 'lib/cuniculus/dispatcher.rb', line 44

def describe(log_level = Logger::DEBUG)
  Cuniculus.logger.info @thread&.backtrace
  @workers.each do |w|
    Cuniculus.logger.log(log_level, w.instance_variable_get(:@thread)&.backtrace)
  end
end

#recover_from_net_errorvoid

This method returns an undefined value.

Starts connection to RabbitMQ followed by starting the workers background threads.

if it fails to connect, it keeps retrying for a certain number of attempts, defined by Config#pub_reconnect_attempts. For unlimited retries, this value should be set to ‘:infinite`.

The time between reconnect attempts follows an exponential backoff formula:

“‘ t = delay * 2^(n-1) “`

where n is the attempt number, and delay is defined by Config#pub_reconnect_delay.

If Config#pub_reconnect_delay_max is defined, it works as a cap for the above time.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/cuniculus/dispatcher.rb', line 104

def recover_from_net_error
  attempt = 0
  begin
    @conn.start
    Cuniculus.logger.info("Connection established")

    @workers.each { |w| w.start!(@conn) }
  rescue *RECOVERABLE_ERRORS => ex
    handle_error(Cuniculus.convert_exception_class(ex, Cuniculus::RMQConnectionError))
    sleep_time = @shutdown ? 1 : [(reconnect_delay * 2**(attempt-1)), reconnect_delay_max].min
    sleep sleep_time
    attempt += 1

    retry if @shutdown && attempt <= reconnect_delay_max
    retry if reconnect_attempts == :infinite || attempt <= reconnect_attempts
  end
end

#shutdownvoid

This method returns an undefined value.

Shutdown workers, giving them time to conclude outstanding tasks.

Shutdown is forced after Config#pub_shutdown_grace_period seconds.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/cuniculus/dispatcher.rb', line 127

def shutdown
  Cuniculus.logger.info("Cuniculus: Shutting down dispatcher")
  @shutdown = true
  alive_size = @workers.size
  shutdown_t0 = Cuniculus.mark_time

  sleep 1 until Cuniculus.mark_time - shutdown_t0 > shutdown_grace_period || @job_queue.empty?

  until Cuniculus.mark_time - shutdown_t0 > shutdown_grace_period || (alive_size = @workers.select(&:alive?).size) == 0
    sleep 1
    alive_size.times { @job_queue << :shutdown }
  end

  @dispatcher_chan << :shutdown
  alive_size = @workers.select(&:alive?).size
  return unless alive_size > 0

  Cuniculus.logger.warn("Cuniculus: Forcing shutdown with #{alive_size} workers remaining")
  describe
end

#start!Object

Starts a thread responsible for reestablishing lost RabbitMQ connections and restarting PubWorkers.

It keeps track of the last time it had to reconnect, in case it receives outdated messages of failed connections from workers.

PubWorkers communicate to it through its ‘dispatcher_chan` queue. Depending on the content fetched from the dispatcher channel, it takes different actions:

  • when a :shutdown message is received, it waits until current jobs are finished (up to the configured ‘shutdown_grace_period`) and stops its background thread.

  • when a timestamp is received that is smaller than the last reconnect timestamp, the message is ignored

  • when the timestamp is larger than the last reconnect timestamp, it tries to reestablish the connection to RabbitMQ and restarts its workers.

Note that the first time the dispatcher is started, it sends a message to its own background thread with a timestamp to trigger the first connection.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/cuniculus/dispatcher.rb', line 65

def start!
  return if @shutdown || @thread&.alive?
  @thread = Thread.new do
    last_connect_time = 0
    loop do
      disconnect_time = @dispatcher_chan.pop
      break if disconnect_time == :shutdown
      if disconnect_time > last_connect_time
        recover_from_net_error
        last_connect_time = Cuniculus.mark_time
      end
    end
  end
  @conn = ::Bunny.new(@config.rabbitmq_opts.merge(ENFORCED_CONN_OPTS).merge(session_error_handler: @thread))
  @dispatcher_chan << Cuniculus.mark_time
end