Class: RocketJob::Event

Inherits:
Object
  • Object
show all
Includes:
Mongoid::Timestamps, Plugins::Document, SemanticLogger::Loggable
Defined in:
lib/rocket_job/event.rb

Overview

RocketJob::Event

Publish and Subscribe to events. Events are published immediately and usually consumed almost immediately by all subscriber processes.

Constant Summary collapse

ALL_EVENTS =
"*".freeze

Class Method Summary collapse

Class Method Details

.add_subscriber(subscriber) ⇒ Object



110
111
112
113
114
# File 'lib/rocket_job/event.rb', line 110

def self.add_subscriber(subscriber)
  name               = subscriber.class.event_name
  @subscribers[name] = @subscribers[name] << subscriber
  subscriber.object_id
end

.collection_exists?Boolean

Returns:

  • (Boolean)


152
153
154
# File 'lib/rocket_job/event.rb', line 152

def self.collection_exists?
  collection.database.collection_names.include?(collection_name.to_s)
end

.convert_to_capped_collection(size) ⇒ Object

Convert a non-capped collection to capped



157
158
159
# File 'lib/rocket_job/event.rb', line 157

def self.convert_to_capped_collection(size)
  collection.database.command("convertToCapped" => collection_name.to_s, "size" => size)
end

.create_capped_collection(size: capped_collection_size) ⇒ Object

Create the capped collection only if it does not exist. Drop the collection before calling this method to re-create it.



99
100
101
102
103
104
105
# File 'lib/rocket_job/event.rb', line 99

def self.create_capped_collection(size: capped_collection_size)
  if collection_exists?
    convert_to_capped_collection(size) unless collection.capped?
  else
    collection.client[collection_name, {capped: true, size: size}].create
  end
end

.listener(time: @load_time) ⇒ Object

Indefinitely tail the capped collection looking for new events.

time: the start time from which to start looking for new events.


86
87
88
89
90
91
92
93
94
95
# File 'lib/rocket_job/event.rb', line 86

def self.listener(time: @load_time)
  Thread.current.name = "rocketjob event"
  create_capped_collection

  logger.info("Event listener started")
  tail_capped_collection(time) { |event| process_event(event) }
rescue Exception => e
  logger.error("#listener Event listener is terminating due to unhandled exception", e)
  raise(e)
end

.process_event(event) ⇒ Object

Process a new event, calling registered subscribers.



138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/rocket_job/event.rb', line 138

def self.process_event(event)
  logger.info("Event Received", event.attributes)

  if @subscribers.key?(event.name)
    @subscribers[event.name].each { |subscriber| subscriber.process_action(event.action, event.parameters) }
  end

  if @subscribers.key?(ALL_EVENTS)
    @subscribers[ALL_EVENTS].each { |subscriber| subscriber.process_event(event.name, event.action, event.parameters) }
  end
rescue StandardError => e
  logger.error("Unknown subscriber. Continuing..", e)
end

.subscribe(subscriber) ⇒ Object

Add a subscriber for its events. Returns a handle to the subscription that can be used to unsubscribe this particular subscription

Example: def MySubscriber

include RocketJob::Subscriber

def hello
  logger.info "Hello Action Received"
end

def show(message:)
  logger.info "Received: #{message}"
end

end

MySubscriber.subscribe



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rocket_job/event.rb', line 66

def self.subscribe(subscriber)
  if block_given?
    begin
      handle = add_subscriber(subscriber)
      yield(subscriber)
    ensure
      unsubscribe(handle) if handle
    end
  else
    add_subscriber(subscriber)
  end
end

.tail_capped_collection(time) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/rocket_job/event.rb', line 116

def self.tail_capped_collection(time)
  with(socket_timeout: long_poll_seconds + 10) do
    filter = {created_at: {"$gt" => time}}
    collection.
      find(filter).
      await_data.
      cursor_type(:tailable_await).
      max_await_time_ms(long_poll_seconds * 1000).
      sort("$natural" => 1).
      each do |doc|
      event = Mongoid::Factory.from_db(Event, doc)
      # Recovery will occur from after the last message read
      time = event.created_at
      yield(event)
    end
  end
rescue Mongo::Error::SocketError, Mongo::Error::SocketTimeoutError, Mongo::Error::OperationFailure, Timeout::Error => e
  logger.info("Creating a new cursor and trying again: #{e.class.name} #{e.message}")
  retry
end

.unsubscribe(handle) ⇒ Object

Unsubscribes a previous subscription



80
81
82
# File 'lib/rocket_job/event.rb', line 80

def self.unsubscribe(handle)
  @subscribers.each_value { |v| v.delete_if { |i| i.object_id == handle } }
end