Class: Shamu::Events::InMemory::Service
- Inherits:
-
EventsService
- Object
- Services::Service
- EventsService
- Shamu::Events::InMemory::Service
- Includes:
- ChannelStats
- Defined in:
- lib/shamu/events/in_memory/service.rb
Overview
Provides an in-memory EventsService that dispatches messages to subscribers within the same process.
Messages are volitale and will be lost if the process crashes.
Direct Known Subclasses
Instance Method Summary collapse
- #channel_stats(name) ⇒ Object
-
#dispatch(*names)
Dispatch all pending mssages in the given named channels.
-
#publish(channel, message)
Publish a well-defined Message to a known channel so that any client that has subscribed will receive a copy of the message to process.
-
#subscribe(channel) {|message| ... }
Subscribe to receive notifications of events on the named channel.
Methods inherited from EventsService
bridge, create, #deserialize, #serialize
Methods inherited from Services::Service
#cache_for, #cached_lookup, #entity_list, #entity_lookup_list, #error, #find_by_lookup, #lazy_association, #lookup_association
Instance Method Details
#channel_stats(name) ⇒ Object
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/shamu/events/in_memory/service.rb', line 48 def channel_stats( name ) channel = fetch_channel( name ) { name: name, subscribers_count: channel[ :subscribers ].count, queue_size: channel[ :queue ].size, dispatching: channel[ :dispatching ] } end |
#dispatch(*names)
This method returns an undefined value.
Dispatch all pending mssages in the given named channels.
39 40 41 42 43 44 45 |
# File 'lib/shamu/events/in_memory/service.rb', line 39 def dispatch( *names ) names = channels.keys if names.empty? names.each do |name| dispatch_channel( fetch_channel( name ) ) end end |
#publish(channel, message)
This method returns an undefined value.
Publish a well-defined Message to a known channel so that any client that has subscribed will receive a copy of the message to process.
Events are delivered asynchronously. There is no guarantee that a subscriber has received or processed a message.
21 22 23 24 25 |
# File 'lib/shamu/events/in_memory/service.rb', line 21 def publish( channel, ) state = fetch_channel( channel ) queue = state[ :queue ] queue.push serialize( ) end |
#subscribe(channel) {|message| ... }
This method returns an undefined value.
Subscribe to receive notifications of events on the named channel. Any
time a publisher pushes a message callback
will be invoked with a copy
of the message.
28 29 30 31 32 33 |
# File 'lib/shamu/events/in_memory/service.rb', line 28 def subscribe( channel, &callback ) subscribers = fetch_channel( channel )[ :subscribers ] mutex.synchronize do subscribers << callback end end |