Class: MessageBus::Backends::Memory::Client
- Inherits:
-
Object
- Object
- MessageBus::Backends::Memory::Client
show all
- Defined in:
- lib/message_bus/backends/memory.rb
Defined Under Namespace
Classes: Channel, Listener
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(_config) ⇒ Client
Returns a new instance of Client.
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/message_bus/backends/memory.rb', line 57
def initialize(_config)
@mutex = Mutex.new
@listeners = []
@timer_thread = MessageBus::TimerThread.new
@timer_thread.on_error do |e|
logger.warn "Failed to process job: #{e} #{e.backtrace}"
end
@timer_thread.every(1) { expire }
reset!
end
|
Instance Attribute Details
#max_backlog_age ⇒ Object
Returns the value of attribute max_backlog_age.
18
19
20
|
# File 'lib/message_bus/backends/memory.rb', line 18
def max_backlog_age
@max_backlog_age
end
|
Instance Method Details
#add(channel, value, max_backlog_age:) ⇒ Object
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/message_bus/backends/memory.rb', line 68
def add(channel, value, max_backlog_age:)
listeners = nil
id = nil
sync do
id = @global_id += 1
channel_object = chan(channel)
channel_object.backlog << [id, value, Time.now]
if max_backlog_age
channel_object.ttl = max_backlog_age
end
listeners = @listeners.dup
end
msg = MessageBus::Message.new id, id, channel, value
payload = msg.encode
listeners.each { |l| l.push(payload) }
id
end
|
#backlog(channel, backlog_id) ⇒ Object
110
111
112
|
# File 'lib/message_bus/backends/memory.rb', line 110
def backlog(channel, backlog_id)
sync { chan(channel).backlog.select { |id, _| id > backlog_id } }
end
|
#clear_channel_backlog(channel, backlog_id, num_to_keep) ⇒ Object
104
105
106
107
108
|
# File 'lib/message_bus/backends/memory.rb', line 104
def clear_channel_backlog(channel, backlog_id, num_to_keep)
oldest = backlog_id - num_to_keep
sync { chan(channel).backlog.delete_if { |id, _| id <= oldest } }
nil
end
|
#clear_global_backlog(backlog_id, num_to_keep) ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/message_bus/backends/memory.rb', line 92
def clear_global_backlog(backlog_id, num_to_keep)
if backlog_id > num_to_keep
oldest = backlog_id - num_to_keep
sync do
@channels.each_value do |channel|
channel.backlog.delete_if { |id, _| id <= oldest }
end
end
nil
end
end
|
#expire ⇒ Object
86
87
88
89
90
|
# File 'lib/message_bus/backends/memory.rb', line 86
def expire
sync do
@channels.delete_if { |_name, channel| channel.expired? }
end
end
|
#expire_all_backlogs! ⇒ Object
use with extreme care, will nuke all of the data
135
136
137
138
139
|
# File 'lib/message_bus/backends/memory.rb', line 135
def expire_all_backlogs!
sync do
@channels = {}
end
end
|
#get_value(channel, id) ⇒ Object
122
123
124
|
# File 'lib/message_bus/backends/memory.rb', line 122
def get_value(channel, id)
sync { chan(channel).backlog.find { |i, _| i == id }[1] }
end
|
#global_backlog(backlog_id) ⇒ Object
114
115
116
117
118
119
120
|
# File 'lib/message_bus/backends/memory.rb', line 114
def global_backlog(backlog_id)
sync do
@channels.dup.flat_map do |channel_name, channel|
channel.backlog.select { |id, _| id > backlog_id }.map { |id, value| [id, channel_name, value] }
end.sort
end
end
|
#max_id(channel = nil) ⇒ Object
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/message_bus/backends/memory.rb', line 141
def max_id(channel = nil)
if channel
sync do
if entry = chan(channel).backlog.last
entry.first
end
end
else
sync { @global_id - 1 }
end || 0
end
|
#reset! ⇒ Object
Dangerous, drops the message_bus table containing the backlog if it exists.
127
128
129
130
131
132
|
# File 'lib/message_bus/backends/memory.rb', line 127
def reset!
sync do
@global_id = 0
@channels = {}
end
end
|
#subscribe {|listener| ... } ⇒ Object
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
# File 'lib/message_bus/backends/memory.rb', line 153
def subscribe
listener = Listener.new
yield listener
q = Queue.new
sync do
@listeners << q
end
listener.do_sub.call
while msg = q.pop
listener.do_message.call(nil, msg)
end
listener.do_unsub.call
sync do
@listeners.delete(q)
end
nil
end
|
#unsubscribe ⇒ Object
174
175
176
|
# File 'lib/message_bus/backends/memory.rb', line 174
def unsubscribe
sync { @listeners.each { |l| l.push(nil) } }
end
|