Class: RocketJob::Event

Inherits:
Object
  • Object
show all
Includes:
Mongoid::Timestamps, Plugins::Document, SemanticLogger::Loggable
Defined in:
lib/rocket_job/event.rb

Overview

RocketJob::Event

Publish and Subscribe to events. Events are published immediately and usually consumed almost immediately by all subscriber processes.

Constant Summary collapse

ALL_EVENTS =
"*".freeze

Class Method Summary collapse

Class Method Details

.create_capped_collection(size: capped_collection_size) ⇒ Object

Create the capped collection only if it does not exist. Drop the collection before calling this method to re-create it.



99
100
101
102
103
104
105
# File 'lib/rocket_job/event.rb', line 99

def self.create_capped_collection(size: capped_collection_size)
  if collection_exists?
    convert_to_capped_collection(size) unless collection.capped?
  else
    collection.client[collection_name, {capped: true, size: size}].create
  end
end

.listener(time: @load_time) ⇒ Object

Indefinitely tail the capped collection looking for new events.

time: the start time from which to start looking for new events.


86
87
88
89
90
91
92
93
94
95
# File 'lib/rocket_job/event.rb', line 86

def self.listener(time: @load_time)
  Thread.current.name = "rocketjob event"
  create_capped_collection

  logger.info("Event listener started")
  tail_capped_collection(time) { |event| process_event(event) }
rescue Exception => e
  logger.error("#listener Event listener is terminating due to unhandled exception", e)
  raise(e)
end

.subscribe(subscriber) ⇒ Object

Add a subscriber for its events. Returns a handle to the subscription that can be used to unsubscribe this particular subscription

Example: def MySubscriber

include RocketJob::Subscriber

def hello
  logger.info "Hello Action Received"
end

def show(message:)
  logger.info "Received: #{message}"
end

end

MySubscriber.subscribe



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rocket_job/event.rb', line 66

def self.subscribe(subscriber)
  if block_given?
    begin
      handle = add_subscriber(subscriber)
      yield(subscriber)
    ensure
      unsubscribe(handle) if handle
    end
  else
    add_subscriber(subscriber)
  end
end

.unsubscribe(handle) ⇒ Object

Unsubscribes a previous subscription



80
81
82
# File 'lib/rocket_job/event.rb', line 80

def self.unsubscribe(handle)
  @subscribers.each_value { |v| v.delete_if { |i| i.object_id == handle } }
end