Class: Shamu::Events::InMemory::Service
- Inherits:
-
EventsService
- Object
- Services::Service
- EventsService
- Shamu::Events::InMemory::Service
- Includes:
- ChannelStats
- Defined in:
- lib/shamu/events/in_memory/service.rb
Overview
Provides an in-memory EventsService that dispatches messages to subscribers within the same process.
Messages are volitale and will be lost if the process crashes.
Direct Known Subclasses
Instance Attribute Summary
Attributes inherited from Services::Service
Instance Method Summary collapse
- #channel_stats(name) ⇒ Object
-
#dispatch(*names)
Dispatch all pending mssages in the given named channels.
-
#publish(channel, message)
Publish a well-defined Message to a known channel so that any client that has subscribed will receive a copy of the message to process.
-
#subscribe(channel) {|message| ... }
Subscribe to receive notifications of events on the named channel.
Methods inherited from EventsService
bridge, create, #deserialize, #serialize
Methods inherited from Services::Service
#cache_for, #cached_lookup, #entity_list, #entity_lookup_list, #error, #find_by_lookup, #lazy_association, #lookup_association
Instance Method Details
#channel_stats(name) ⇒ Object
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/shamu/events/in_memory/service.rb', line 48 def channel_stats( name ) channel = fetch_channel( name ) { name: name, subscribers_count: channel[ :subscribers ].count, queue_size: channel[ :queue ].size, dispatching: channel[ :dispatching ] } end |
#dispatch(*names)
This method returns an undefined value.
Dispatch all pending mssages in the given named channels.
39 40 41 42 43 44 45 |
# File 'lib/shamu/events/in_memory/service.rb', line 39 def dispatch( *names ) names = channels.keys if names.empty? names.each do |name| dispatch_channel( fetch_channel( name ) ) end end |
#publish(channel, message)
This method returns an undefined value.
Publish a well-defined Message to a known channel so that any client that has subscribed will receive a copy of the message to process.
Events are delivered asynchronously. There is no guarantee that a subscriber has received or processed a message.
21 22 23 24 25 |
# File 'lib/shamu/events/in_memory/service.rb', line 21 def publish( channel, ) state = fetch_channel( channel ) queue = state[ :queue ] queue.push serialize( ) end |
#subscribe(channel) {|message| ... }
This method returns an undefined value.
Subscribe to receive notifications of events on the named channel. Any
time a publisher pushes a message callback will be invoked with a copy
of the message.
28 29 30 31 32 33 |
# File 'lib/shamu/events/in_memory/service.rb', line 28 def subscribe( channel, &callback ) subscribers = fetch_channel( channel )[ :subscribers ] mutex.synchronize do subscribers << callback end end |