Class: Shamu::Events::ActiveRecord::Service
- Inherits:
-
EventsService
- Object
- Services::Service
- EventsService
- Shamu::Events::ActiveRecord::Service
- Includes:
- ChannelStats
- Defined in:
- lib/shamu/events/active_record/service.rb
Overview
Store events in a database using ActiveRecord persistence layer.
Runner IDS
A globally unique id (may be UUID or a well- defined internal convention that guarantees uniqueness.) The runner id is used by the system to track which messages have been delivered to the subscribers hosted by that runner process. This allows dispatching to resume should the host or process die.
Class Method Summary collapse
-
.ensure_records!
Ensure that the tables are present in the database and have been initialized.
Instance Method Summary collapse
-
#channel_stats(name, runner_id: nil) ⇒ Hash
Gets stats for the given
channel
. -
#dispatch(runner_id, *names, limit: nil) ⇒ Hash<String,Integer>
Dispatch queued messages up to the given
limit
. -
#initialize ⇒ Service
constructor
A new instance of Service.
-
#publish(channel, message)
Publish a well-defined Message to a known channel so that any client that has subscribed will receive a copy of the message to process.
-
#subscribe(channel) {|message| ... }
Subscribe to receive notifications of events on the named channel.
Methods inherited from EventsService
bridge, create, #deserialize, #serialize
Methods inherited from Services::Service
#cache_for, #cached_lookup, #entity_list, #entity_lookup_list, #find_by_lookup, #lazy_association, #lookup_association
Constructor Details
#initialize ⇒ Service
Returns a new instance of Service.
30 31 32 33 34 35 36 |
# File 'lib/shamu/events/active_record/service.rb', line 30 def initialize self.class.ensure_records! @channels ||= {} @mutex ||= Mutex.new super end |
Class Method Details
.ensure_records!
This method returns an undefined value.
Ensure that the tables are present in the database and have been initialized.
23 24 25 26 27 28 |
# File 'lib/shamu/events/active_record/service.rb', line 23 def self.ensure_records! return if @ensure_records @ensure_records = true Migration.new.migrate( :up ) end |
Instance Method Details
#channel_stats(name, runner_id: nil) ⇒ Hash
Gets stats for the given channel
.
Stats Included in the results.
- name name of the channel.
- subscribers_count the number of subscribers.
- queue_size the size of the message queue.
- dispatching true if the channel is currently dispatching messages.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/shamu/events/active_record/service.rb', line 78 def channel_stats( name, runner_id: nil ) channel = fetch_channel( name ) queue = Message.where( channel_id: channel[:id] ) if runner_id && ( runner = create_runner( runner_id ) ) if runner.last_processed_id queue = queue.where( Message.arel_table[ :id ].gt( runner.last_processed_id ) ) end end { name: name, subscribers_count: channel[:subscribers].size, dispatching: channel[:dispatching], queue_size: queue.count } end |
#dispatch(runner_id, *names, limit: nil) ⇒ Hash<String,Integer>
Dispatch queued messages up to the given limit
. Once all the
messages are dispatched, the method returns. A long running process
might periodically call dispatch in a loop trapping SIGINT to
shutdown.
66 67 68 69 70 71 72 73 74 |
# File 'lib/shamu/events/active_record/service.rb', line 66 def dispatch( runner_id, *names, limit: nil ) fail UnknownRunnerError unless runner_id.present? names = channels.keys unless channels.present? names.each_with_object( {} ) do |name, dispatched| state = fetch_channel( name ) dispatched[name] = dispatch_channel( state, "#{ runner_id }::#{ name }", limit ) end end |
#publish(channel, message)
This method returns an undefined value.
Publish a well-defined Message to a known channel so that any client that has subscribed will receive a copy of the message to process.
Events are delivered asynchronously. There is no guarantee that a subscriber has received or processed a message.
39 40 41 42 |
# File 'lib/shamu/events/active_record/service.rb', line 39 def publish( channel, ) channel_id = fetch_channel( channel )[:id] Message.create! channel_id: channel_id, message: serialize( ) end |
#subscribe(channel) {|message| ... }
This method returns an undefined value.
Subscribe to receive notifications of events on the named channel. Any
time a publisher pushes a message callback
will be invoked with a copy
of the message.
45 46 47 48 49 50 |
# File 'lib/shamu/events/active_record/service.rb', line 45 def subscribe( channel, &callback ) state = fetch_channel( channel ) mutex.synchronize do state[:subscribers] << callback end end |