Class: Ribbon::EventBus::Publishers::AsyncResquePublisher
- Defined in:
- lib/ribbon/event_bus/publishers/async_resque_publisher.rb
Defined Under Namespace
Classes: PublisherWorker
Instance Attribute Summary collapse
-
#resque ⇒ Object
readonly
Returns the value of attribute resque.
Instance Method Summary collapse
-
#initialize(instance = nil, params = {}) ⇒ AsyncResquePublisher
constructor
A new instance of AsyncResquePublisher.
- #publish(event) ⇒ Object
-
#subscription_queue_formatter ⇒ Object
Needs to exist for the ResquePublisher::PublisherJob to succeed.
-
#worker ⇒ Object
The suprvisor created in the initializer will restart the PublisherWorker but there can be a short period of time when the actor returned by Celluloid::Actor is dead.
- #worker_id ⇒ Object
Methods inherited from Publisher
Methods included from Mixins::HasConfig
#_has_config_config, #_has_config_load_config, #config, included
Methods included from Mixins::HasInstance
Constructor Details
#initialize(instance = nil, params = {}) ⇒ AsyncResquePublisher
Returns a new instance of AsyncResquePublisher.
12 13 14 15 |
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 12 def initialize(instance=nil, params={}) super @resque = config.resque? ? config.resque : Resque end |
Instance Attribute Details
#resque ⇒ Object (readonly)
Returns the value of attribute resque.
10 11 12 |
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 10 def resque @resque end |
Instance Method Details
#publish(event) ⇒ Object
56 57 58 59 |
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 56 def publish(event) super worker.async.publish(event) unless event.subscriptions.empty? end |
#subscription_queue_formatter ⇒ Object
Needs to exist for the ResquePublisher::PublisherJob to succeed.
52 53 54 |
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 52 def subscription_queue_formatter ResquePublisher.subscription_queue_formatter(config) end |
#worker ⇒ Object
The suprvisor created in the initializer will restart the PublisherWorker but there can be a short period of time when the actor returned by Celluloid::Actor is dead. To avoid that we sleep for a millisecond to give it time to create a new worker thread. We try three times before giving up.
This should ensure that it’s unlikely for a dead worker to be returned. However, if a dead worker is returned, then async calls will silently fail, allowing normal execution. This makes firing events best-effort.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 31 def worker # Retrieve the PublisherWorker or start the supervisor. w = Celluloid::Actor[worker_id] || PublisherWorker.supervise( args: [self], as: worker_id ).send(worker_id) 3.times { if w.dead? sleep(0.001) w = Celluloid::Actor[worker_id] else break end } w end |
#worker_id ⇒ Object
17 18 19 |
# File 'lib/ribbon/event_bus/publishers/async_resque_publisher.rb', line 17 def worker_id @__worker_id ||= "event_bus_resque_worker_#{object_id}".to_sym end |