Class: MessageBus::Memory::Client
- Inherits:
-
Object
- Object
- MessageBus::Memory::Client
- Defined in:
- lib/message_bus/backends/memory.rb
Defined Under Namespace
Classes: Listener
Instance Method Summary collapse
- #add(channel, value) ⇒ Object
- #backlog(channel, backlog_id) ⇒ Object
- #clear_channel_backlog(channel, backlog_id, num_to_keep) ⇒ Object
- #clear_global_backlog(backlog_id, num_to_keep) ⇒ Object
- #get_value(channel, id) ⇒ Object
- #global_backlog(backlog_id) ⇒ Object
-
#initialize(config) ⇒ Client
constructor
A new instance of Client.
- #max_id(channel = nil) ⇒ Object
-
#reset! ⇒ Object
Dangerous, drops the message_bus table containing the backlog if it exists.
- #subscribe {|listener| ... } ⇒ Object
- #unsubscribe ⇒ Object
Constructor Details
#initialize(config) ⇒ Client
Returns a new instance of Client.
20 21 22 23 24 |
# File 'lib/message_bus/backends/memory.rb', line 20 def initialize(config) @mutex = Mutex.new @listeners = [] reset! end |
Instance Method Details
#add(channel, value) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/message_bus/backends/memory.rb', line 26 def add(channel, value) listeners = nil id = nil sync do id = @global_id += 1 chan(channel) << [id, value] listeners = @listeners.dup end msg = MessageBus::Message.new id, id, channel, value payload = msg.encode listeners.each{|l| l.push(payload)} id end |
#backlog(channel, backlog_id) ⇒ Object
58 59 60 |
# File 'lib/message_bus/backends/memory.rb', line 58 def backlog(channel, backlog_id) sync{chan(channel).select{|id, _| id > backlog_id}} end |
#clear_channel_backlog(channel, backlog_id, num_to_keep) ⇒ Object
52 53 54 55 56 |
# File 'lib/message_bus/backends/memory.rb', line 52 def clear_channel_backlog(channel, backlog_id, num_to_keep) oldest = backlog_id - num_to_keep sync{chan(channel).delete_if{|id, _| id <= oldest}} nil end |
#clear_global_backlog(backlog_id, num_to_keep) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/message_bus/backends/memory.rb', line 40 def clear_global_backlog(backlog_id, num_to_keep) if backlog_id > num_to_keep oldest = backlog_id - num_to_keep sync do @channels.each_value do |entries| entries.delete_if{|id, _| id <= oldest} end end nil end end |
#get_value(channel, id) ⇒ Object
70 71 72 |
# File 'lib/message_bus/backends/memory.rb', line 70 def get_value(channel, id) sync{chan(channel).find{|i, _| i == id}.last} end |
#global_backlog(backlog_id) ⇒ Object
62 63 64 65 66 67 68 |
# File 'lib/message_bus/backends/memory.rb', line 62 def global_backlog(backlog_id) sync do @channels.dup.flat_map do |channel, | .select{|id, _| id > backlog_id}.map{|id, value| [id, channel, value]} end.sort end end |
#max_id(channel = nil) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/message_bus/backends/memory.rb', line 82 def max_id(channel=nil) if channel sync do if entry = chan(channel).last entry.first end end else sync{@global_id - 1} end || 0 end |
#reset! ⇒ Object
Dangerous, drops the message_bus table containing the backlog if it exists.
75 76 77 78 79 80 |
# File 'lib/message_bus/backends/memory.rb', line 75 def reset! sync do @global_id = 0 @channels = {} end end |
#subscribe {|listener| ... } ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/message_bus/backends/memory.rb', line 94 def subscribe listener = Listener.new yield listener q = Queue.new sync do @listeners << q end listener.do_sub.call while msg = q.pop listener..call(nil, msg) end listener.do_unsub.call sync do @listeners.delete(q) end nil end |
#unsubscribe ⇒ Object
115 116 117 |
# File 'lib/message_bus/backends/memory.rb', line 115 def unsubscribe sync{@listeners.each{|l| l.push(nil)}} end |