Class: Ribbon::EventBus::Publishers::AsyncResquePublisher

Inherits:
Publisher
  • Object
show all
Defined in:
lib/ribbon/event_bus/publishers/async_resque_publisher.rb

Defined Under Namespace

Classes: PublisherWorker

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Publisher

#config

Methods included from Mixins::HasConfig

#_has_config_config, #_has_config_load_config, #config, included

Methods included from Mixins::HasInstance

#instance, #plugins

Constructor Details

#initialize(instance = nil, params = {}) ⇒ AsyncResquePublisher

Returns a new instance of AsyncResquePublisher.



12
13
14
15
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 12

def initialize(instance=nil, params={})
  super
  @resque = config.resque? ? config.resque : Resque
end

Instance Attribute Details

#resqueObject (readonly)

Returns the value of attribute resque.



10
11
12
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 10

def resque
  @resque
end

Instance Method Details

#publish(event) ⇒ Object



56
57
58
59
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 56

def publish(event)
  super
  worker.async.publish(event) unless event.subscriptions.empty?
end

#subscription_queue_formatterObject

Needs to exist for the ResquePublisher::PublisherJob to succeed.



52
53
54
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 52

def subscription_queue_formatter
  ResquePublisher.subscription_queue_formatter(config)
end

#workerObject

The suprvisor created in the initializer will restart the PublisherWorker but there can be a short period of time when the actor returned by Celluloid::Actor is dead. To avoid that we sleep for a millisecond to give it time to create a new worker thread. We try three times before giving up.

This should ensure that it’s unlikely for a dead worker to be returned. However, if a dead worker is returned, then async calls will silently fail, allowing normal execution. This makes firing events best-effort.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 31

def worker
  # Retrieve the PublisherWorker or start the supervisor.
  w = Celluloid::Actor[worker_id] || PublisherWorker.supervise(
    args: [self],
    as: worker_id
  ).send(worker_id)

  3.times {
    if w.dead?
      sleep(0.001)
      w = Celluloid::Actor[worker_id]
    else
      break
    end
  }

  w
end

#worker_idObject



17
18
19
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 17

def worker_id
  @__worker_id ||= "event_bus_resque_worker_#{object_id}".to_sym
end