Class: RocketJob::Event
- Inherits:
-
Object
- Object
- RocketJob::Event
- Includes:
- Mongoid::Timestamps, Plugins::Document, SemanticLogger::Loggable
- Defined in:
- lib/rocket_job/event.rb
Overview
RocketJob::Event
Publish and Subscribe to events. Events are published immediately and usually consumed almost immediately by all subscriber processes.
Constant Summary collapse
- ALL_EVENTS =
'*'.freeze
Class Method Summary collapse
-
.create_capped_collection(size: capped_collection_size) ⇒ Object
Create the capped collection only if it does not exist.
-
.listener(time: @load_time) ⇒ Object
Indefinitely tail the capped collection looking for new events.
-
.subscribe(subscriber) ⇒ Object
Add a subscriber for its events.
-
.unsubscribe(handle) ⇒ Object
Unsubscribes a previous subscription.
Class Method Details
.create_capped_collection(size: capped_collection_size) ⇒ Object
Create the capped collection only if it does not exist. Drop the collection before calling this method to re-create it.
99 100 101 102 103 104 105 |
# File 'lib/rocket_job/event.rb', line 99 def self.create_capped_collection(size: capped_collection_size) if collection_exists? convert_to_capped_collection(size) unless collection.capped? else collection.client[collection_name, {capped: true, size: size}].create end end |
.listener(time: @load_time) ⇒ Object
Indefinitely tail the capped collection looking for new events.
time: the start time from which to start looking for new events.
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/rocket_job/event.rb', line 86 def self.listener(time: @load_time) Thread.current.name = 'rocketjob event' create_capped_collection logger.info('Event listener started') tail_capped_collection(time) { |event| process_event(event) } rescue Exception => exc logger.error('#listener Event listener is terminating due to unhandled exception', exc) raise(exc) end |
.subscribe(subscriber) ⇒ Object
Add a subscriber for its events. Returns a handle to the subscription that can be used to unsubscribe this particular subscription
Example: def MySubscriber
include RocketJob::Subscriber
def hello
logger.info "Hello Action Received"
end
def show(message:)
logger.info "Received: #{}"
end
end
MySubscriber.subscribe
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rocket_job/event.rb', line 66 def self.subscribe(subscriber) if block_given? begin handle = add_subscriber(subscriber) yield(subscriber) ensure unsubscribe(handle) if handle end else add_subscriber(subscriber) end end |
.unsubscribe(handle) ⇒ Object
Unsubscribes a previous subscription
80 81 82 |
# File 'lib/rocket_job/event.rb', line 80 def self.unsubscribe(handle) @subscribers.each_value { |v| v.delete_if { |i| i.object_id == handle } } end |