Class: TinyBus

Inherits:
Object
  • Object
show all
Defined in:
lib/tiny_bus.rb

Overview

This class implements a very simpler PubSub system where:

  • subscribers can subscribe via the #sub method

  • subscribers can unsubscribe via the #unsub method

  • msgs can enter the TinyBus via the #msg method

The messages that come into this TinyBus are assumed to be Hash-like, in the sense that they have a ‘topic’ key that can be accessed using Hash-like key access syntax, and that the ‘topic’ key will serve as the method of distribution.

Usage:

t = TinyBus.new
t.sub('news', <some object that responds to #msg)
t.msg({'topic' => 'news', 'details' => 'Historic happenings!})     # goes to 'news' subscribers
t.msg({'topic' => 'whatever', 'details' => 'Historic happenings!}) # goes to dead letter output, or raises exception, depending on the configuration

Initialization options:

TinyBus.new(log: <some object that responds to #puts>)               # will send a copy of all successful messages to the log
TinyBus.new(dead: <some object that responds to #puts>)              # will send a copy of all unsuccessful messages to the dead object
TinyBus.new(raise_on_dead: true)                                     # strict mode for undeliverable messages, defaults to false

Defined Under Namespace

Classes: DeadMsgError, SubscriberDoesNotMsg, SubscriptionToDotTopicError

Instance Method Summary collapse

Constructor Details

#initialize(log: nil, dead: nil, raise_on_dead: false) ⇒ TinyBus

log:

if specified it should be a valid filename
if not specified will default to $stdout

dead:

if specified it should be a valid filename for dead letter logging
if not specified will default to $stderr

raise_on_dead:

kind of a strict mode. if false, then messages with a topic with no
subscribers will go to the dead file. if true, then messages with a topic
with no subscribers will raise an exception.


37
38
39
40
41
42
43
# File 'lib/tiny_bus.rb', line 37

def initialize(log: nil, dead: nil, raise_on_dead: false)
  @subs = {}
  @stats = { '.dead' => 0 }
  @log = log ? TinyLog.new(log) : $stdout
  @dead = dead ? File.open(dead, 'a') : $stderr
  @raise_on_dead = raise_on_dead
end

Instance Method Details

#msg(msg) ⇒ Object

takes an incoming message and distributes it to subscribers

this method also annotates incoming messages with two dot properties:

  • .time: the current timestamp, accurate to the microsecond

  • .msg_uuid: a UUID to uniquely identify this message

NOTE: it modifies the incoming msg object in place in order to avoid unnecessary object allocations



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/tiny_bus.rb', line 72

def msg(msg)
  t = msg['topic']
  subbers = @subs[t]

  annotated = msg.merge!({
              '.time' => Time.now.utc.iso8601(6),
              '.msg_uuid' => SecureRandom.uuid
            })

  if subbers
    @stats[t] += 1
    subbers.each{|s| s.msg(annotated) }
    @log.puts annotated
  else
    if @raise_on_dead
      raise TinyBus::DeadMsgError.new("Could not deliver message to topic `#{t}'")
    else
      @stats['.dead'] += 1
      @dead.puts annotated
    end
  end
end

#sub(topic, subber) ⇒ Object

adds a subscriber to a topic

topics can be any string that doesn’t start with a dot (.) - dot topics are reserved for internal TinyBus usage, such as:

  • .log



50
51
52
53
54
55
56
57
# File 'lib/tiny_bus.rb', line 50

def sub(topic, subber)
  raise TinyBus::SubscriptionToDotTopicError.new("Cannot subscribe to dot topic `#{topic}', because these are reserved for internal use") if topic.start_with?('.')
  raise TinyBus::SubscriberDoesNotMsg.new("The specified subscriber type `#{subber.class.inspect}' does not respond to #msg") unless subber.respond_to?(:msg)

  @subs[topic] ||= Set.new
  @subs[topic] << subber
  @stats[topic] ||= 0
end

#to_sObject

helpful for debugging, gives you a count of the number of messages sent to each topic, including the .dead topic, which is where messages go where there are no subscribes for a given topic



98
99
100
101
102
# File 'lib/tiny_bus.rb', line 98

def to_s
  "  TinyBus stats: \#{@subs.keys.length > 0 ? \"\\n  \" + @stats.keys.sort.map{|t| \"\#{t.rjust(12)}: \#{@stats[t]}\" }.join(\"\\n  \") : '<NONE>'}\n  DEBUG\nend\n"

#unsub(topic, subber) ⇒ Object

removes a subscriber from a topic



60
61
62
# File 'lib/tiny_bus.rb', line 60

def unsub(topic, subber)
  @subs[topic]&.delete(subber)
end