Class: Shamu::Events::ActiveRecord::Service

Inherits:
EventsService show all
Includes:
ChannelStats
Defined in:
lib/shamu/events/active_record/service.rb

Overview

Store events in a database using ActiveRecord persistence layer.

Runner IDS

A globally unique id (may be UUID or a well- defined internal convention that guarantees uniqueness.) The runner id is used by the system to track which messages have been delivered to the subscribers hosted by that runner process. This allows dispatching to resume should the host or process die.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from EventsService

bridge, create, #deserialize, #serialize

Methods inherited from Services::Service

#cache_for, #cached_lookup, #entity_list, #entity_lookup_list, #error, #find_by_lookup, #lazy_association, #lookup_association

Class Method Details

.ensure_records!

This method returns an undefined value.

Ensure that the tables are present in the database and have been initialized.



23
24
25
26
27
28
# File 'lib/shamu/events/active_record/service.rb', line 23

def self.ensure_records!
  return if @ensure_records

  @ensure_records = true
  Migration.new.migrate( :up )
end

Instance Method Details

#channel_stats(name, runner_id: nil) ⇒ Hash

Gets stats for the given channel.

Stats Included in the results.

  • name name of the channel.
  • subscribers_count the number of subscribers.
  • queue_size the size of the message queue.
  • dispatching true if the channel is currently dispatching messages.

Parameters:

  • name (String)

    of the channel

  • runner_id (String) (defaults to: nil)

    if provided, only show stats for the given runner.

Returns:

  • (Hash)

    stats.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/shamu/events/active_record/service.rb', line 76

def channel_stats( name, runner_id: nil )
  channel = fetch_channel( name )
  queue   = Message.where( channel_id: channel[:id] )

  if runner_id && ( runner = create_runner( runner_id ) )
    if runner.last_processed_id
      queue = queue.where( Message.arel_table[ :id ].gt( runner.last_processed_id ) )
    end
  end

  {
    name: name,
    subscribers_count: channel[:subscribers].size,
    dispatching: channel[:dispatching],
    queue_size: queue.count
  }
end

#dispatch(runner_id, *names, limit: nil) ⇒ Hash<String,Integer>

Dispatch queued messages up to the given limit. Once all the messages are dispatched, the method returns. A long running process might periodically call dispatch in a loop trapping SIGINT to shutdown.

Parameters:

  • runner_id (String)

    that identifies the host and process responding to events.

  • names (Array<String>)

    of the channels to dispatch. If empty, dispatches to all subscribed channels.

  • limit (Integer) (defaults to: nil)

    the maximum number of messages to dispatch. If not given, defaults to 100.

Returns:

  • (Hash<String,Integer>)

    the number of messages actually dispatched on each channel.



64
65
66
67
68
69
70
71
72
# File 'lib/shamu/events/active_record/service.rb', line 64

def dispatch( runner_id, *names, limit: nil )
  fail UnknownRunnerError unless runner_id.present?
  names = channels.keys unless channels.present?

  names.each_with_object( {} ) do |name, dispatched|
    state = fetch_channel( name )
    dispatched[name] = dispatch_channel( state, "#{ runner_id }::#{ name }", limit )
  end
end

#publish(channel, message)

This method returns an undefined value.

Publish a well-defined Message to a known channel so that any client that has subscribed will receive a copy of the message to process.

Events are delivered asynchronously. There is no guarantee that a subscriber has received or processed a message.

Parameters:

  • channel (String)

    to publish to.

  • message (Message)

    to publish.



37
38
39
40
# File 'lib/shamu/events/active_record/service.rb', line 37

def publish( channel, message )
  channel_id = fetch_channel( channel )[:id]
  Message.create! channel_id: channel_id, message: serialize( message )
end

#subscribe(channel) {|message| ... }

This method returns an undefined value.

Subscribe to receive notifications of events on the named channel. Any time a publisher pushes a message callback will be invoked with a copy of the message.

Parameters:

  • channel (String)

    to listen to.

Yields:

  • (message)

Yield Parameters:



43
44
45
46
47
48
# File 'lib/shamu/events/active_record/service.rb', line 43

def subscribe( channel, &callback )
  state = fetch_channel( channel )
  mutex.synchronize do
    state[:subscribers] << callback
  end
end