Class: MessageBus::Memory::Client
- Inherits:
-
Object
- Object
- MessageBus::Memory::Client
- Defined in:
- lib/message_bus/backends/memory.rb
Defined Under Namespace
Classes: Listener
Instance Method Summary collapse
- #add(channel, value) ⇒ Object
- #backlog(channel, backlog_id) ⇒ Object
- #clear_channel_backlog(channel, backlog_id, num_to_keep) ⇒ Object
- #clear_global_backlog(backlog_id, num_to_keep) ⇒ Object
- #get_value(channel, id) ⇒ Object
- #global_backlog(backlog_id) ⇒ Object
-
#initialize(config) ⇒ Client
constructor
A new instance of Client.
- #max_id(channel = nil) ⇒ Object
-
#reset! ⇒ Object
Dangerous, drops the message_bus table containing the backlog if it exists.
- #subscribe {|listener| ... } ⇒ Object
- #unsubscribe ⇒ Object
Constructor Details
#initialize(config) ⇒ Client
Returns a new instance of Client.
21 22 23 24 25 |
# File 'lib/message_bus/backends/memory.rb', line 21 def initialize(config) @mutex = Mutex.new @listeners = [] reset! end |
Instance Method Details
#add(channel, value) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/message_bus/backends/memory.rb', line 27 def add(channel, value) listeners = nil id = nil sync do id = @global_id += 1 chan(channel) << [id, value] listeners = @listeners.dup end msg = MessageBus::Message.new id, id, channel, value payload = msg.encode listeners.each { |l| l.push(payload) } id end |
#backlog(channel, backlog_id) ⇒ Object
59 60 61 |
# File 'lib/message_bus/backends/memory.rb', line 59 def backlog(channel, backlog_id) sync { chan(channel).select { |id, _| id > backlog_id } } end |
#clear_channel_backlog(channel, backlog_id, num_to_keep) ⇒ Object
53 54 55 56 57 |
# File 'lib/message_bus/backends/memory.rb', line 53 def clear_channel_backlog(channel, backlog_id, num_to_keep) oldest = backlog_id - num_to_keep sync { chan(channel).delete_if { |id, _| id <= oldest } } nil end |
#clear_global_backlog(backlog_id, num_to_keep) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/message_bus/backends/memory.rb', line 41 def clear_global_backlog(backlog_id, num_to_keep) if backlog_id > num_to_keep oldest = backlog_id - num_to_keep sync do @channels.each_value do |entries| entries.delete_if { |id, _| id <= oldest } end end nil end end |
#get_value(channel, id) ⇒ Object
71 72 73 |
# File 'lib/message_bus/backends/memory.rb', line 71 def get_value(channel, id) sync { chan(channel).find { |i, _| i == id }.last } end |
#global_backlog(backlog_id) ⇒ Object
63 64 65 66 67 68 69 |
# File 'lib/message_bus/backends/memory.rb', line 63 def global_backlog(backlog_id) sync do @channels.dup.flat_map do |channel, | .select { |id, _| id > backlog_id }.map { |id, value| [id, channel, value] } end.sort end end |
#max_id(channel = nil) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/message_bus/backends/memory.rb', line 83 def max_id(channel = nil) if channel sync do if entry = chan(channel).last entry.first end end else sync { @global_id - 1 } end || 0 end |
#reset! ⇒ Object
Dangerous, drops the message_bus table containing the backlog if it exists.
76 77 78 79 80 81 |
# File 'lib/message_bus/backends/memory.rb', line 76 def reset! sync do @global_id = 0 @channels = {} end end |
#subscribe {|listener| ... } ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/message_bus/backends/memory.rb', line 95 def subscribe listener = Listener.new yield listener q = Queue.new sync do @listeners << q end listener.do_sub.call while msg = q.pop listener..call(nil, msg) end listener.do_unsub.call sync do @listeners.delete(q) end nil end |
#unsubscribe ⇒ Object
116 117 118 |
# File 'lib/message_bus/backends/memory.rb', line 116 def unsubscribe sync { @listeners.each { |l| l.push(nil) } } end |