Method: MessageBus::Backends::Postgres#global_subscribe
- Defined in:
- lib/message_bus/backends/postgres.rb
#global_subscribe(last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 |
# File 'lib/message_bus/backends/postgres.rb', line 343 def global_subscribe(last_id = nil) raise ArgumentError unless block_given? highest_id = last_id begin client.subscribe(postgresql_channel_name) do |on| h = {} on.subscribe do if highest_id process_global_backlog(highest_id) do |m| h[m.global_id] = true yield m end end h = nil if h.empty? @subscribed = true end on.unsubscribe do @subscribed = false end on. do |_c, m| if m == UNSUB_MESSAGE @subscribed = false return end m = MessageBus::Message.decode m # we have 3 options # # 1. message came in the correct order GREAT, just deal with it # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog # 3. message came in the incorrect order and is lowest than current highest id, reset if h # If already yielded during the clear backlog when subscribing, # don't yield a duplicate copy. unless h.delete(m.global_id) h = nil if h.empty? yield m end else yield m end end end rescue => error @config[:logger].warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}" sleep 1 retry end end |