Class: MessageBus::Memory::ReliablePubSub
- Inherits:
-
Object
- Object
- MessageBus::Memory::ReliablePubSub
- Defined in:
- lib/message_bus/backends/memory.rb
Constant Summary collapse
- UNSUB_MESSAGE =
"$$UNSUBSCRIBE"
Instance Attribute Summary collapse
-
#clear_every ⇒ Object
Returns the value of attribute clear_every.
-
#max_backlog_size ⇒ Object
Returns the value of attribute max_backlog_size.
-
#max_global_backlog_size ⇒ Object
Returns the value of attribute max_global_backlog_size.
-
#subscribed ⇒ Object
readonly
Returns the value of attribute subscribed.
Instance Method Summary collapse
- #after_fork ⇒ Object
- #backend ⇒ Object
- #backlog(channel, last_id = nil) ⇒ Object
- #client ⇒ Object
- #get_message(channel, message_id) ⇒ Object
- #global_backlog(last_id = nil) ⇒ Object
- #global_subscribe(last_id = nil, &blk) ⇒ Object
- #global_unsubscribe ⇒ Object
-
#initialize(config = {}, max_backlog_size = 1000) ⇒ ReliablePubSub
constructor
max_backlog_size is per multiplexed channel.
- #last_id(channel) ⇒ Object
- #new_connection ⇒ Object
- #process_global_backlog(highest_id) ⇒ Object
- #publish(channel, data, opts = nil) ⇒ Object
-
#reset! ⇒ Object
use with extreme care, will nuke all of the data.
- #subscribe(channel, last_id = nil) ⇒ Object
Constructor Details
#initialize(config = {}, max_backlog_size = 1000) ⇒ ReliablePubSub
max_backlog_size is per multiplexed channel
138 139 140 141 142 143 144 |
# File 'lib/message_bus/backends/memory.rb', line 138 def initialize(config = {}, max_backlog_size = 1000) @config = config @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 # after 7 days inactive backlogs will be removed @clear_every = config[:clear_every] || 1 end |
Instance Attribute Details
#clear_every ⇒ Object
Returns the value of attribute clear_every.
133 134 135 |
# File 'lib/message_bus/backends/memory.rb', line 133 def clear_every @clear_every end |
#max_backlog_size ⇒ Object
Returns the value of attribute max_backlog_size.
133 134 135 |
# File 'lib/message_bus/backends/memory.rb', line 133 def max_backlog_size @max_backlog_size end |
#max_global_backlog_size ⇒ Object
Returns the value of attribute max_global_backlog_size.
133 134 135 |
# File 'lib/message_bus/backends/memory.rb', line 133 def max_global_backlog_size @max_global_backlog_size end |
#subscribed ⇒ Object (readonly)
Returns the value of attribute subscribed.
132 133 134 |
# File 'lib/message_bus/backends/memory.rb', line 132 def subscribed @subscribed end |
Instance Method Details
#after_fork ⇒ Object
154 155 156 |
# File 'lib/message_bus/backends/memory.rb', line 154 def after_fork nil end |
#backend ⇒ Object
150 151 152 |
# File 'lib/message_bus/backends/memory.rb', line 150 def backend :memory end |
#backlog(channel, last_id = nil) ⇒ Object
182 183 184 185 186 187 188 |
# File 'lib/message_bus/backends/memory.rb', line 182 def backlog(channel, last_id = nil) items = client.backlog channel, last_id.to_i items.map! do |id, data| MessageBus::Message.new id, id, channel, data end end |
#client ⇒ Object
158 159 160 |
# File 'lib/message_bus/backends/memory.rb', line 158 def client @client ||= new_connection end |
#get_message(channel, message_id) ⇒ Object
200 201 202 203 204 205 206 |
# File 'lib/message_bus/backends/memory.rb', line 200 def (channel, ) if data = client.get_value(channel, ) MessageBus::Message.new , , channel, data else nil end end |
#global_backlog(last_id = nil) ⇒ Object
190 191 192 193 194 195 196 197 198 |
# File 'lib/message_bus/backends/memory.rb', line 190 def global_backlog(last_id = nil) last_id = last_id.to_i items = client.global_backlog last_id.to_i items.map! do |id, channel, data| MessageBus::Message.new id, id, channel, data end end |
#global_subscribe(last_id = nil, &blk) ⇒ Object
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/message_bus/backends/memory.rb', line 236 def global_subscribe(last_id = nil, &blk) raise ArgumentError unless block_given? highest_id = last_id begin client.subscribe do |on| h = {} on.subscribe do if highest_id process_global_backlog(highest_id) do |m| h[m.global_id] = true yield m end end @subscribed = true end on.unsubscribe do @subscribed = false end on. do |c, m| m = MessageBus::Message.decode m # we have 3 options # # 1. message came in the correct order GREAT, just deal with it # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog # 3. message came in the incorrect order and is lowest than current highest id, reset if h # If already yielded during the clear backlog when subscribing, # don't yield a duplicate copy. unless h.delete(m.global_id) h = nil if h.empty? yield m end else yield m end end end rescue => error MessageBus.logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}" sleep 1 retry end end |
#global_unsubscribe ⇒ Object
231 232 233 234 |
# File 'lib/message_bus/backends/memory.rb', line 231 def global_unsubscribe client.unsubscribe @subscribed = false end |
#last_id(channel) ⇒ Object
178 179 180 |
# File 'lib/message_bus/backends/memory.rb', line 178 def last_id(channel) client.max_id(channel) end |
#new_connection ⇒ Object
146 147 148 |
# File 'lib/message_bus/backends/memory.rb', line 146 def new_connection MessageBus::Memory::Client.new(@config) end |
#process_global_backlog(highest_id) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/message_bus/backends/memory.rb', line 218 def process_global_backlog(highest_id) if highest_id > client.max_id highest_id = 0 end global_backlog(highest_id).each do |old| yield old highest_id = old.global_id end highest_id end |
#publish(channel, data, opts = nil) ⇒ Object
167 168 169 170 171 172 173 174 175 176 |
# File 'lib/message_bus/backends/memory.rb', line 167 def publish(channel, data, opts = nil) client = self.client backlog_id = client.add(channel, data) if backlog_id % clear_every == 0 client.clear_global_backlog(backlog_id, @max_global_backlog_size) client.clear_channel_backlog(channel, backlog_id, @max_backlog_size) end backlog_id end |
#reset! ⇒ Object
use with extreme care, will nuke all of the data
163 164 165 |
# File 'lib/message_bus/backends/memory.rb', line 163 def reset! client.reset! end |
#subscribe(channel, last_id = nil) ⇒ Object
208 209 210 211 212 213 214 215 216 |
# File 'lib/message_bus/backends/memory.rb', line 208 def subscribe(channel, last_id = nil) # trivial implementation for now, # can cut down on connections if we only have one global subscriber raise ArgumentError unless block_given? global_subscribe(last_id) do |m| yield m if m.channel == channel end end |