Method: MessageBus::Backends::Postgres#global_subscribe

Defined in:
lib/message_bus/backends/postgres.rb

#global_subscribe(last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • last_id (#to_i) (defaults to: nil)

    the global ID of the last message that the caller received

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:

  • (ArgumentError)


343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/message_bus/backends/postgres.rb', line 343

def global_subscribe(last_id = nil)
  raise ArgumentError unless block_given?

  highest_id = last_id

  begin
    client.subscribe(postgresql_channel_name) do |on|
      h = {}

      on.subscribe do
        if highest_id
          process_global_backlog(highest_id) do |m|
            h[m.global_id] = true
            yield m
          end
        end
        h = nil if h.empty?
        @subscribed = true
      end

      on.unsubscribe do
        @subscribed = false
      end

      on.message do |_c, m|
        if m == UNSUB_MESSAGE
          @subscribed = false
          return
        end
        m = MessageBus::Message.decode m

        # we have 3 options
        #
        # 1. message came in the correct order GREAT, just deal with it
        # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
        # 3. message came in the incorrect order and is lowest than current highest id, reset

        if h
          # If already yielded during the clear backlog when subscribing,
          # don't yield a duplicate copy.
          unless h.delete(m.global_id)
            h = nil if h.empty?
            yield m
          end
        else
          yield m
        end
      end
    end
  rescue => error
    @config[:logger].warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}"
    sleep 1
    retry
  end
end