Class: MessageBus::Postgres::ReliablePubSub
- Inherits:
-
Object
- Object
- MessageBus::Postgres::ReliablePubSub
- Defined in:
- lib/message_bus/backends/postgres.rb
Constant Summary collapse
- UNSUB_MESSAGE =
"$$UNSUBSCRIBE"
Instance Attribute Summary collapse
-
#clear_every ⇒ Object
Returns the value of attribute clear_every.
-
#max_backlog_age ⇒ Object
Returns the value of attribute max_backlog_age.
-
#max_backlog_size ⇒ Object
Returns the value of attribute max_backlog_size.
-
#max_global_backlog_size ⇒ Object
Returns the value of attribute max_global_backlog_size.
-
#subscribed ⇒ Object
readonly
Returns the value of attribute subscribed.
Class Method Summary collapse
Instance Method Summary collapse
- #after_fork ⇒ Object
- #backend ⇒ Object
- #backlog(channel, last_id = nil) ⇒ Object
- #client ⇒ Object
- #get_message(channel, message_id) ⇒ Object
- #global_backlog(last_id = nil) ⇒ Object
- #global_subscribe(last_id = nil, &blk) ⇒ Object
- #global_unsubscribe ⇒ Object
-
#initialize(config = {}, max_backlog_size = 1000) ⇒ ReliablePubSub
constructor
max_backlog_size is per multiplexed channel.
- #last_id(channel) ⇒ Object
- #new_connection ⇒ Object
- #postgresql_channel_name ⇒ Object
- #process_global_backlog(highest_id) ⇒ Object
- #publish(channel, data, queue_in_memory = true) ⇒ Object
-
#reset! ⇒ Object
use with extreme care, will nuke all of the data.
- #subscribe(channel, last_id = nil) ⇒ Object
Constructor Details
#initialize(config = {}, max_backlog_size = 1000) ⇒ ReliablePubSub
max_backlog_size is per multiplexed channel
224 225 226 227 228 229 230 231 |
# File 'lib/message_bus/backends/postgres.rb', line 224 def initialize(config = {}, max_backlog_size = 1000) @config = config @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 # after 7 days inactive backlogs will be removed @max_backlog_age = 604800 @clear_every = config[:clear_every] || 1 end |
Instance Attribute Details
#clear_every ⇒ Object
Returns the value of attribute clear_every.
215 216 217 |
# File 'lib/message_bus/backends/postgres.rb', line 215 def clear_every @clear_every end |
#max_backlog_age ⇒ Object
Returns the value of attribute max_backlog_age.
215 216 217 |
# File 'lib/message_bus/backends/postgres.rb', line 215 def max_backlog_age @max_backlog_age end |
#max_backlog_size ⇒ Object
Returns the value of attribute max_backlog_size.
215 216 217 |
# File 'lib/message_bus/backends/postgres.rb', line 215 def max_backlog_size @max_backlog_size end |
#max_global_backlog_size ⇒ Object
Returns the value of attribute max_global_backlog_size.
215 216 217 |
# File 'lib/message_bus/backends/postgres.rb', line 215 def max_global_backlog_size @max_global_backlog_size end |
#subscribed ⇒ Object (readonly)
Returns the value of attribute subscribed.
214 215 216 |
# File 'lib/message_bus/backends/postgres.rb', line 214 def subscribed @subscribed end |
Class Method Details
.reset!(config) ⇒ Object
219 220 221 |
# File 'lib/message_bus/backends/postgres.rb', line 219 def self.reset!(config) MessageBus::Postgres::Client.new(config).reset! end |
Instance Method Details
#after_fork ⇒ Object
241 242 243 |
# File 'lib/message_bus/backends/postgres.rb', line 241 def after_fork client.reconnect end |
#backend ⇒ Object
237 238 239 |
# File 'lib/message_bus/backends/postgres.rb', line 237 def backend :postgres end |
#backlog(channel, last_id = nil) ⇒ Object
278 279 280 281 282 283 284 |
# File 'lib/message_bus/backends/postgres.rb', line 278 def backlog(channel, last_id = nil) items = client.backlog channel, last_id.to_i items.map! do |id, data| MessageBus::Message.new id, id, channel, data end end |
#client ⇒ Object
250 251 252 |
# File 'lib/message_bus/backends/postgres.rb', line 250 def client @client ||= new_connection end |
#get_message(channel, message_id) ⇒ Object
294 295 296 297 298 299 300 |
# File 'lib/message_bus/backends/postgres.rb', line 294 def (channel, ) if data = client.get_value(channel, ) MessageBus::Message.new , , channel, data else nil end end |
#global_backlog(last_id = nil) ⇒ Object
286 287 288 289 290 291 292 |
# File 'lib/message_bus/backends/postgres.rb', line 286 def global_backlog(last_id = nil) items = client.global_backlog last_id.to_i items.map! do |id, channel, data| MessageBus::Message.new id, id, channel, data end end |
#global_subscribe(last_id = nil, &blk) ⇒ Object
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
# File 'lib/message_bus/backends/postgres.rb', line 330 def global_subscribe(last_id = nil, &blk) raise ArgumentError unless block_given? highest_id = last_id begin client.subscribe(postgresql_channel_name) do |on| h = {} on.subscribe do if highest_id process_global_backlog(highest_id) do |m| h[m.global_id] = true yield m end end h = nil if h.empty? @subscribed = true end on.unsubscribe do @subscribed = false end on. do |c, m| if m == UNSUB_MESSAGE @subscribed = false return end m = MessageBus::Message.decode m # we have 3 options # # 1. message came in the correct order GREAT, just deal with it # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog # 3. message came in the incorrect order and is lowest than current highest id, reset if h # If already yielded during the clear backlog when subscribing, # don't yield a duplicate copy. unless h.delete(m.global_id) h = nil if h.empty? yield m end else yield m end end end rescue => error MessageBus.logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}" sleep 1 retry end end |
#global_unsubscribe ⇒ Object
325 326 327 328 |
# File 'lib/message_bus/backends/postgres.rb', line 325 def global_unsubscribe client.publish(postgresql_channel_name, UNSUB_MESSAGE) @subscribed = false end |
#last_id(channel) ⇒ Object
274 275 276 |
# File 'lib/message_bus/backends/postgres.rb', line 274 def last_id(channel) client.max_id(channel) end |
#new_connection ⇒ Object
233 234 235 |
# File 'lib/message_bus/backends/postgres.rb', line 233 def new_connection MessageBus::Postgres::Client.new(@config) end |
#postgresql_channel_name ⇒ Object
245 246 247 248 |
# File 'lib/message_bus/backends/postgres.rb', line 245 def postgresql_channel_name db = @config[:db] || 0 "_message_bus_#{db}" end |
#process_global_backlog(highest_id) ⇒ Object
312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/message_bus/backends/postgres.rb', line 312 def process_global_backlog(highest_id) if highest_id > client.max_id highest_id = 0 end global_backlog(highest_id).each do |old| yield old highest_id = old.global_id end highest_id end |
#publish(channel, data, queue_in_memory = true) ⇒ Object
259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/message_bus/backends/postgres.rb', line 259 def publish(channel, data, queue_in_memory = true) client = self.client backlog_id = client.add(channel, data) msg = MessageBus::Message.new backlog_id, backlog_id, channel, data payload = msg.encode client.publish postgresql_channel_name, payload if backlog_id % clear_every == 0 client.clear_global_backlog(backlog_id, @max_global_backlog_size) client.expire(@max_backlog_age) client.clear_channel_backlog(channel, backlog_id, @max_backlog_size) end backlog_id end |
#reset! ⇒ Object
use with extreme care, will nuke all of the data
255 256 257 |
# File 'lib/message_bus/backends/postgres.rb', line 255 def reset! client.reset! end |
#subscribe(channel, last_id = nil) ⇒ Object
302 303 304 305 306 307 308 309 310 |
# File 'lib/message_bus/backends/postgres.rb', line 302 def subscribe(channel, last_id = nil) # trivial implementation for now, # can cut down on connections if we only have one global subscriber raise ArgumentError unless block_given? global_subscribe(last_id) do |m| yield m if m.channel == channel end end |