Class: TinyBus
- Inherits:
-
Object
- Object
- TinyBus
- Defined in:
- lib/tiny_bus.rb
Overview
This class implements a very simpler PubSub system where:
-
subscribers can subscribe via the #sub method
-
subscribers can unsubscribe via the #unsub method
-
msgs can enter the TinyBus via the #msg method
The messages that come into this TinyBus are assumed to be Hash-like, in the sense that they have a ‘topic’ key that can be accessed using Hash-like key access syntax, and that the ‘topic’ key will serve as the method of distribution.
Usage:
t = TinyBus.new
t.sub('news', <some object that responds to #msg)
t.msg({'topic' => 'news', 'details' => 'Historic happenings!}) # goes to 'news' subscribers
t.msg({'topic' => 'whatever', 'details' => 'Historic happenings!}) # goes to dead letter output, or raises exception, depending on the configuration
Initialization options:
TinyBus.new(log: <some object that responds to #puts>) # will send a copy of all successful messages to the log
TinyBus.new(dead: <some object that responds to #puts>) # will send a copy of all unsuccessful messages to the dead object
TinyBus.new(raise_on_dead: true) # strict mode for undeliverable messages, defaults to false
Defined Under Namespace
Classes: DeadMsgError, SubscriberDoesNotMsg, SubscriptionToDotTopicError
Instance Method Summary collapse
-
#initialize(log: nil, dead: nil, raise_on_dead: false) ⇒ TinyBus
constructor
log: if specified it should be a valid filename if not specified will default to $stdout dead: if specified it should be a valid filename for dead letter logging if not specified will default to $stderr raise_on_dead: kind of a strict mode.
-
#msg(msg) ⇒ Object
takes an incoming message and distributes it to subscribers.
-
#sub(topic, subber) ⇒ Object
adds a subscriber to a topic.
-
#to_s ⇒ Object
helpful for debugging, gives you a count of the number of messages sent to each topic, including the .dead topic, which is where messages go where there are no subscribes for a given topic.
-
#unsub(topic, subber) ⇒ Object
removes a subscriber from a topic.
Constructor Details
#initialize(log: nil, dead: nil, raise_on_dead: false) ⇒ TinyBus
log:
if specified it should be a valid filename
if not specified will default to $stdout
dead:
if specified it should be a valid filename for dead letter logging
if not specified will default to $stderr
raise_on_dead:
kind of a strict mode. if false, then with a topic with no
subscribers will go to the dead file. if true, then with a topic
with no subscribers will raise an exception.
37 38 39 40 41 42 43 |
# File 'lib/tiny_bus.rb', line 37 def initialize(log: nil, dead: nil, raise_on_dead: false) @subs = {} @stats = { '.dead' => 0 } @log = log ? TinyLog.new(log) : $stdout @dead = dead ? File.open(dead, 'a') : $stderr @raise_on_dead = raise_on_dead end |
Instance Method Details
#msg(msg) ⇒ Object
takes an incoming message and distributes it to subscribers
this method also annotates incoming messages with two dot properties:
-
.time: the current timestamp, accurate to the microsecond
-
.msg_uuid: a UUID to uniquely identify this message
NOTE: it modifies the incoming msg object in place in order to avoid unnecessary object allocations
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/tiny_bus.rb', line 72 def msg(msg) t = msg['topic'] subbers = @subs[t] annotated = msg.merge!({ '.time' => Time.now.utc.iso8601(6), '.msg_uuid' => SecureRandom.uuid }) if subbers @stats[t] += 1 subbers.each{|s| s.msg(annotated) } @log.puts annotated else if @raise_on_dead raise TinyBus::DeadMsgError.new("Could not deliver message to topic `#{t}'") else @stats['.dead'] += 1 @dead.puts annotated end end end |
#sub(topic, subber) ⇒ Object
adds a subscriber to a topic
topics can be any string that doesn’t start with a dot (.) - dot topics are reserved for internal TinyBus usage, such as:
-
.log
50 51 52 53 54 55 56 57 |
# File 'lib/tiny_bus.rb', line 50 def sub(topic, subber) raise TinyBus::SubscriptionToDotTopicError.new("Cannot subscribe to dot topic `#{topic}', because these are reserved for internal use") if topic.start_with?('.') raise TinyBus::SubscriberDoesNotMsg.new("The specified subscriber type `#{subber.class.inspect}' does not respond to #msg") unless subber.respond_to?(:msg) @subs[topic] ||= Set.new @subs[topic] << subber @stats[topic] ||= 0 end |
#to_s ⇒ Object
helpful for debugging, gives you a count of the number of messages sent to each topic, including the .dead topic, which is where messages go where there are no subscribes for a given topic
98 99 100 101 102 |
# File 'lib/tiny_bus.rb', line 98 def to_s " TinyBus stats: \#{@subs.keys.length > 0 ? \"\\n \" + @stats.keys.sort.map{|t| \"\#{t.rjust(12)}: \#{@stats[t]}\" }.join(\"\\n \") : '<NONE>'}\n DEBUG\nend\n" |
#unsub(topic, subber) ⇒ Object
removes a subscriber from a topic
60 61 62 |
# File 'lib/tiny_bus.rb', line 60 def unsub(topic, subber) @subs[topic]&.delete(subber) end |