Class: Sidekiq::Throttled::Communicator::Callbacks

Inherits:
Object
  • Object
show all
Includes:
ExceptionHandler
Defined in:
lib/sidekiq/throttled/communicator/callbacks.rb

Overview

Callbacks registry and runner. Runs registered callbacks in dedicated Fiber solving issue with ConnectionPool and Redis client in subscriber mode.

Once Redis entered subscriber mode `#subscribe` method, it can't be used for any command but pub/sub or quit, making it impossible to use for anything else. ConnectionPool binds reserved client to Thread, thus nested `#with` calls inside same thread result into a same connection. That makes it impossible to issue any normal Redis commands from within listener Thread.

Instance Method Summary collapse

Constructor Details

#initializeCallbacks

Initializes callbacks registry.


27
28
29
30
# File 'lib/sidekiq/throttled/communicator/callbacks.rb', line 27

def initialize
  @mutex    = Mutex.new
  @handlers = Hash.new { |h, k| h[k] = [] }
end

Instance Method Details

#on(event) {|*args| ... } ⇒ self

Registers handler of given event.

Examples:


callbacks.on "and out comes wolves" do |who|
  puts "#{who} let the dogs out?!"
end

Parameters:

  • event (#to_s)

Yields:

  • (*args)

    Runs given block upon `event`

Yield Returns:

  • (void)

Returns:

  • (self)

Raises:

  • (ArgumentError)

    if no handler block given


45
46
47
48
49
# File 'lib/sidekiq/throttled/communicator/callbacks.rb', line 45

def on(event, &handler)
  raise ArgumentError, "No block given" unless handler
  @mutex.synchronize { @handlers[event.to_s] << handler }
  self
end

#run(event, payload = nil) ⇒ void

This method returns an undefined value.

Runs event handlers with given args.

Parameters:

  • event (#to_s)
  • payload (Object) (defaults to: nil)

56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/sidekiq/throttled/communicator/callbacks.rb', line 56

def run(event, payload = nil)
  @mutex.synchronize do
    Fiber.new do
      @handlers[event.to_s].each do |callback|
        begin
          callback.call(payload)
        rescue => e
          handle_exception(e, {
            :context => "sidekiq:throttled"
          })
        end
      end
    end.resume
  end
end