Class: Shamu::Events::InMemory::Service

Inherits:
EventsService show all
Includes:
ChannelStats
Defined in:
lib/shamu/events/in_memory/service.rb

Overview

Provides an in-memory EventsService that dispatches messages to subscribers within the same process.

Messages are volitale and will be lost if the process crashes.

Direct Known Subclasses

AsyncService

Instance Method Summary collapse

Methods inherited from EventsService

bridge, create, #deserialize, #serialize

Methods inherited from Services::Service

#cache_for, #cached_lookup, #entity_list, #entity_lookup_list, #error, #find_by_lookup, #lazy_association, #lookup_association

Instance Method Details

#channel_stats(name) ⇒ Object



48
49
50
51
52
53
54
55
56
57
# File 'lib/shamu/events/in_memory/service.rb', line 48

def channel_stats( name )
  channel = fetch_channel( name )

  {
    name: name,
    subscribers_count: channel[ :subscribers ].count,
    queue_size: channel[ :queue ].size,
    dispatching: channel[ :dispatching ]
  }
end

#dispatch(*names)

This method returns an undefined value.

Dispatch all pending mssages in the given named channels.

Parameters:

  • names (Array<String>)

    of the channels to dispatch. Dispatches to all queues if empty.



39
40
41
42
43
44
45
# File 'lib/shamu/events/in_memory/service.rb', line 39

def dispatch( *names )
  names = channels.keys if names.empty?

  names.each do |name|
    dispatch_channel( fetch_channel( name ) )
  end
end

#publish(channel, message)

This method returns an undefined value.

Publish a well-defined Message to a known channel so that any client that has subscribed will receive a copy of the message to process.

Events are delivered asynchronously. There is no guarantee that a subscriber has received or processed a message.

Parameters:

  • channel (String)

    to publish to.

  • message (Message)

    to publish.



21
22
23
24
25
# File 'lib/shamu/events/in_memory/service.rb', line 21

def publish( channel, message )
  state = fetch_channel( channel )
  queue = state[ :queue ]
  queue.push serialize( message )
end

#subscribe(channel) {|message| ... }

This method returns an undefined value.

Subscribe to receive notifications of events on the named channel. Any time a publisher pushes a message callback will be invoked with a copy of the message.

Parameters:

  • channel (String)

    to listen to.

Yields:

  • (message)

Yield Parameters:



28
29
30
31
32
33
# File 'lib/shamu/events/in_memory/service.rb', line 28

def subscribe( channel, &callback )
  subscribers = fetch_channel( channel )[ :subscribers ]
  mutex.synchronize do
    subscribers << callback
  end
end