Class: TinyBus
- Inherits:
-
Object
- Object
- TinyBus
- Defined in:
- lib/tiny_bus.rb
Overview
NOTE: This library depends on the TinyLog library.
This class implements a very simple PubSub system where:
-
subscribers can subscribe via the #sub method
-
subscribers can unsubscribe via the #unsub method
-
msgs can enter the TinyBus via the #msg method
The messages that come into this TinyBus are assumed to be Hash-like, in the sense that they have a ‘.topic’ key that can be accessed using Hash-like key access syntax, and that the ‘.topic’ key will serve as the method of distribution.
Usage:
t = TinyBus.new
t.sub('news', <some object that responds to #msg)
t.msg({'.topic' => 'news', 'details' => 'Historic happenings!}) # goes to 'news' subscribers
t.msg({'.topic' => 'whatever', 'details' => 'Historic happenings!}) # goes to dead letter output, or raises exception, depending on the configuration
Initialization options:
TinyBus.new(log: <a TinyLog for output>) # will log all normal msgs in this file
TinyBus.new(dead: <a TinyLog for dead messages>) # will log all undeliverable msgs in this file
TinyBus.new(raise_on_dead: true) # strict mode for undeliverable messages, defaults to false
Defined Under Namespace
Classes: DeadMsgError, SubscriberDoesNotMsg
Constant Summary collapse
- ANNOTATION_PREFIX_DEFAULT =
'.'
Instance Attribute Summary collapse
-
#dead_topics ⇒ Object
readonly
Returns the value of attribute dead_topics.
Instance Method Summary collapse
- #_to_subber_id(subber) ⇒ Object
-
#initialize(log: nil, dead: nil, translator: nil, raise_on_dead: false, annotation_prefix: ANNOTATION_PREFIX_DEFAULT) ⇒ TinyBus
constructor
log: if specified, it should be a TinyLog instance if not specified, it will create a new TinyLog instance for $stdout dead: if specified, it should be a TinyLog instance if not specified, it will create a new TinyLog instance for $stderr translator: the translator is an instance of TinyPipe, if you want to translate the incoming masssage (i.e. annotate with additional fields, change keys/values on incoming messges).
-
#msg(msg, lvl = :info) ⇒ Object
takes an incoming message and distributes it to subscribers.
-
#stats ⇒ Object
returns a #dup of the internal statistics which track the number of messages sent to each topic, the dead queue, and total messages.
-
#sub(topic, subber) ⇒ Object
adds a subscriber to a topic.
-
#to_s ⇒ Object
helpful for debugging, gives you a count of the number of messages sent to each topic, including the .dead topic, which is where messages go where there are no subscribes for a given topic.
-
#unsub(topic, subber) ⇒ Object
removes a subscriber from a topic.
Constructor Details
#initialize(log: nil, dead: nil, translator: nil, raise_on_dead: false, annotation_prefix: ANNOTATION_PREFIX_DEFAULT) ⇒ TinyBus
log:
if specified, it should be a TinyLog instance
if not specified, it will create a new TinyLog instance for $stdout
dead:
if specified, it should be a TinyLog instance
if not specified, it will create a new TinyLog instance for $stderr
translator:
the translator is an instance of TinyPipe, if you want to translate the
incoming masssage (i.e. annotate with additional fields, change
keys/values on incoming messges). if not specified no translatioins will
be made on incoming messages other than the default annotations
NOTE: all messages are automatically annotated with three fields:
- .time: the Unix time the message is received in Integer milliseconds,
- .msg_uuid: a unique UUID for the incoming message
- .trace: a unique UUID for chains of messages (if not present)
raise_on_dead:
kind of a strict mode. if false, then messages with a '.topic' with no
subscribers will go to the dead file. if true, then messages with a
'.topic' with no subscribers will raise an exception.
annotation_prefix:
default: '.'
if specified, the annotated message attributes ('.time', '.msg_uuid', and
'.trace') will have the dot ('.') replaced with the specified prefix text
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/tiny_bus.rb', line 58 def initialize(log: nil, dead: nil, translator: nil, raise_on_dead: false, annotation_prefix: ANNOTATION_PREFIX_DEFAULT) @subs = {} @dead_topics = Set.new @translator = translator @total_key = "#{annotation_prefix}total" @dead_key = "#{annotation_prefix}dead" @topic_key = "#{annotation_prefix}topic" @time_key = "#{annotation_prefix}time" @msg_uuid_key = "#{annotation_prefix}msg_uuid" @trace_key = "#{annotation_prefix}trace" @annotator = TinyPipe.new([ ->(m){ m[@time_key] ||= (Time.now.to_f * 1000).to_i; m }, ->(m){ m[@msg_uuid_key] ||= SecureRandom.uuid; m }, ->(m){ m[@trace_key] ||= SecureRandom.uuid; m } ]) @stats = { @total_key => 0, @dead_key => 0 } @log = log || TinyLog.new(filename: $stdout) @dead = dead || TinyLog.new(filename: $stderr) @raise_on_dead = raise_on_dead end |
Instance Attribute Details
#dead_topics ⇒ Object (readonly)
Returns the value of attribute dead_topics.
33 34 35 |
# File 'lib/tiny_bus.rb', line 33 def dead_topics @dead_topics end |
Instance Method Details
#_to_subber_id(subber) ⇒ Object
110 111 112 |
# File 'lib/tiny_bus.rb', line 110 def _to_subber_id(subber) "#{subber.class.name}@#{subber.object_id}" end |
#msg(msg, lvl = :info) ⇒ Object
takes an incoming message and distributes it to subscribers
msg: the incoming message to be distributed, must be a Ruby Hash lvl (optional): the logging level
default: 'info'
NOTE: this method modifies the incoming msg object in place in order to avoid unnecessary object allocations
NOTE: keys that begin with dot (.), such as ‘.time’ are reserved for TinyBus and show not be altered by outside code, otherwise undefined behavior may result.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/tiny_bus.rb', line 126 def msg(msg, lvl=:info) msg = @annotator.pipe(msg) msg = @translator&.pipe(msg) || msg topic = msg[@topic_key] subbers = @subs[topic] @stats[topic] ||= 0 @stats[@total_key] += 1 @stats[topic] += 1 if (subbers&.length || 0) > 0 # cloning is necessary, because sending messanges may result in new # subscribers to the same topic we're iterating over right now. in that # situation we would run into a RuntimeError that prevented the # modification of the Set we're iterating over to send messages. subbers.clone.each{|s| s.msg(msg) } @log.send(lvl, "S #{msg}") else if @raise_on_dead raise TinyBus::DeadMsgError.new("Could not deliver message to topic `#{topic}'") else @stats[@dead_key] += 1 @dead_topics << topic @dead.send(lvl, "D #{msg}") end end end |
#stats ⇒ Object
returns a #dup of the internal statistics which track the number of messages sent to each topic, the dead queue, and total messages
157 158 159 |
# File 'lib/tiny_bus.rb', line 157 def stats @stats.dup end |
#sub(topic, subber) ⇒ Object
adds a subscriber to a topic
-
topic: either a String, or an array of String
-
subber: an object that responds to #msg that can receive messages
87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/tiny_bus.rb', line 87 def sub(topic, subber) raise TinyBus::SubscriberDoesNotMsg.new("The specified subscriber type `#{subber.class.inspect}' does not respond to #msg") unless subber.respond_to?(:msg) topics = topic.is_a?(Array) ? topic : [topic] topics.each do |t| @dead_topics.delete(t) @subs[t] ||= Set.new @subs[t] << subber @stats[t] ||= 0 msg({ @topic_key => 'sub', 'to_topic' => t, 'subber' => _to_subber_id(subber) }, 'TINYBUS-SUB') end end |
#to_s ⇒ Object
helpful for debugging, gives you a count of the number of messages sent to each topic, including the .dead topic, which is where messages go where there are no subscribes for a given topic
164 165 166 167 168 169 170 171 172 173 |
# File 'lib/tiny_bus.rb', line 164 def to_s <<~DEBUG TinyBus stats: #{@stats.keys.length > 0 ? "\n " + @stats.keys.sort.map{|t| "#{t.rjust(12)}: #{@stats[t]}" }.join("\n ") : '<NONE>'} Dead topics: [ #{@dead_topics.sort.each_slice(3).map{|slice| slice.join(', ') }.join("\n ")} ] Topics & Subscribers: #{@subs.map{|topic, subbers| "#{topic}:\n #{subbers.map(&:to_s).join("\n ")}" }.join("\n ") } DEBUG end |
#unsub(topic, subber) ⇒ Object
removes a subscriber from a topic
103 104 105 106 107 108 |
# File 'lib/tiny_bus.rb', line 103 def unsub(topic, subber) @subs[topic]&.delete(subber) @dead_topics << topic if @subs[topic].empty? msg({ @topic_key => 'unsub', 'from_topic' => topic, 'subber' => _to_subber_id(subber) }, 'TINYBUS-UNSUB') end |