Class: MessageBus::Memory::ReliablePubSub
- Inherits:
-
Object
- Object
- MessageBus::Memory::ReliablePubSub
- Defined in:
- lib/message_bus/backends/memory.rb
Constant Summary collapse
- UNSUB_MESSAGE =
"$$UNSUBSCRIBE"
Instance Attribute Summary collapse
-
#clear_every ⇒ Object
Returns the value of attribute clear_every.
-
#max_backlog_size ⇒ Object
Returns the value of attribute max_backlog_size.
-
#max_global_backlog_size ⇒ Object
Returns the value of attribute max_global_backlog_size.
-
#subscribed ⇒ Object
readonly
Returns the value of attribute subscribed.
Instance Method Summary collapse
- #after_fork ⇒ Object
- #backend ⇒ Object
- #backlog(channel, last_id = nil) ⇒ Object
- #client ⇒ Object
- #get_message(channel, message_id) ⇒ Object
- #global_backlog(last_id = nil) ⇒ Object
- #global_subscribe(last_id = nil, &blk) ⇒ Object
- #global_unsubscribe ⇒ Object
-
#initialize(config = {}, max_backlog_size = 1000) ⇒ ReliablePubSub
constructor
max_backlog_size is per multiplexed channel.
- #last_id(channel) ⇒ Object
- #new_connection ⇒ Object
- #process_global_backlog(highest_id) ⇒ Object
- #publish(channel, data, queue_in_memory = true) ⇒ Object
-
#reset! ⇒ Object
use with extreme care, will nuke all of the data.
- #subscribe(channel, last_id = nil) ⇒ Object
Constructor Details
#initialize(config = {}, max_backlog_size = 1000) ⇒ ReliablePubSub
max_backlog_size is per multiplexed channel
137 138 139 140 141 142 143 |
# File 'lib/message_bus/backends/memory.rb', line 137 def initialize(config = {}, max_backlog_size = 1000) @config = config @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 # after 7 days inactive backlogs will be removed @clear_every = config[:clear_every] || 1 end |
Instance Attribute Details
#clear_every ⇒ Object
Returns the value of attribute clear_every.
132 133 134 |
# File 'lib/message_bus/backends/memory.rb', line 132 def clear_every @clear_every end |
#max_backlog_size ⇒ Object
Returns the value of attribute max_backlog_size.
132 133 134 |
# File 'lib/message_bus/backends/memory.rb', line 132 def max_backlog_size @max_backlog_size end |
#max_global_backlog_size ⇒ Object
Returns the value of attribute max_global_backlog_size.
132 133 134 |
# File 'lib/message_bus/backends/memory.rb', line 132 def max_global_backlog_size @max_global_backlog_size end |
#subscribed ⇒ Object (readonly)
Returns the value of attribute subscribed.
131 132 133 |
# File 'lib/message_bus/backends/memory.rb', line 131 def subscribed @subscribed end |
Instance Method Details
#after_fork ⇒ Object
153 154 155 |
# File 'lib/message_bus/backends/memory.rb', line 153 def after_fork nil end |
#backend ⇒ Object
149 150 151 |
# File 'lib/message_bus/backends/memory.rb', line 149 def backend :memory end |
#backlog(channel, last_id = nil) ⇒ Object
181 182 183 184 185 186 187 |
# File 'lib/message_bus/backends/memory.rb', line 181 def backlog(channel, last_id = nil) items = client.backlog channel, last_id.to_i items.map! do |id, data| MessageBus::Message.new id, id, channel, data end end |
#client ⇒ Object
157 158 159 |
# File 'lib/message_bus/backends/memory.rb', line 157 def client @client ||= new_connection end |
#get_message(channel, message_id) ⇒ Object
199 200 201 202 203 204 205 |
# File 'lib/message_bus/backends/memory.rb', line 199 def (channel, ) if data = client.get_value(channel, ) MessageBus::Message.new , , channel, data else nil end end |
#global_backlog(last_id = nil) ⇒ Object
189 190 191 192 193 194 195 196 197 |
# File 'lib/message_bus/backends/memory.rb', line 189 def global_backlog(last_id = nil) last_id = last_id.to_i items = client.global_backlog last_id.to_i items.map! do |id, channel, data| MessageBus::Message.new id, id, channel, data end end |
#global_subscribe(last_id = nil, &blk) ⇒ Object
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/message_bus/backends/memory.rb', line 235 def global_subscribe(last_id=nil, &blk) raise ArgumentError unless block_given? highest_id = last_id begin client.subscribe do |on| h = {} on.subscribe do if highest_id process_global_backlog(highest_id) do |m| h[m.global_id] = true yield m end end @subscribed = true end on.unsubscribe do @subscribed = false end on. do |c,m| m = MessageBus::Message.decode m # we have 3 options # # 1. message came in the correct order GREAT, just deal with it # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog # 3. message came in the incorrect order and is lowest than current highest id, reset if h # If already yielded during the clear backlog when subscribing, # don't yield a duplicate copy. unless h.delete(m.global_id) h = nil if h.empty? yield m end else yield m end end end rescue => error MessageBus.logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}" sleep 1 retry end end |
#global_unsubscribe ⇒ Object
230 231 232 233 |
# File 'lib/message_bus/backends/memory.rb', line 230 def global_unsubscribe client.unsubscribe @subscribed = false end |
#last_id(channel) ⇒ Object
177 178 179 |
# File 'lib/message_bus/backends/memory.rb', line 177 def last_id(channel) client.max_id(channel) end |
#new_connection ⇒ Object
145 146 147 |
# File 'lib/message_bus/backends/memory.rb', line 145 def new_connection MessageBus::Memory::Client.new(@config) end |
#process_global_backlog(highest_id) ⇒ Object
217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/message_bus/backends/memory.rb', line 217 def process_global_backlog(highest_id) if highest_id > client.max_id highest_id = 0 end global_backlog(highest_id).each do |old| yield old highest_id = old.global_id end highest_id end |
#publish(channel, data, queue_in_memory = true) ⇒ Object
166 167 168 169 170 171 172 173 174 175 |
# File 'lib/message_bus/backends/memory.rb', line 166 def publish(channel, data, queue_in_memory=true) client = self.client backlog_id = client.add(channel, data) if backlog_id % clear_every == 0 client.clear_global_backlog(backlog_id, @max_global_backlog_size) client.clear_channel_backlog(channel, backlog_id, @max_backlog_size) end backlog_id end |
#reset! ⇒ Object
use with extreme care, will nuke all of the data
162 163 164 |
# File 'lib/message_bus/backends/memory.rb', line 162 def reset! client.reset! end |
#subscribe(channel, last_id = nil) ⇒ Object
207 208 209 210 211 212 213 214 215 |
# File 'lib/message_bus/backends/memory.rb', line 207 def subscribe(channel, last_id = nil) # trivial implementation for now, # can cut down on connections if we only have one global subscriber raise ArgumentError unless block_given? global_subscribe(last_id) do |m| yield m if m.channel == channel end end |