Method: MessageBus::Backends::Memory#global_subscribe

Defined in:
lib/message_bus/backends/memory.rb

#global_subscribe(last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Raises:

  • (ArgumentError)


288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/message_bus/backends/memory.rb', line 288

def global_subscribe(last_id = nil)
  raise ArgumentError unless block_given?

  highest_id = last_id

  begin
    client.subscribe do |on|
      h = {}

      on.subscribe do
        if highest_id
          process_global_backlog(highest_id) do |m|
            h[m.global_id] = true
            yield m
          end
        end
        @subscribed = true
      end

      on.unsubscribe do
        @subscribed = false
      end

      on.message do |_c, m|
        m = MessageBus::Message.decode m

        # we have 3 options
        #
        # 1. message came in the correct order GREAT, just deal with it
        # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
        # 3. message came in the incorrect order and is lowest than current highest id, reset

        if h
          # If already yielded during the clear backlog when subscribing,
          # don't yield a duplicate copy.
          unless h.delete(m.global_id)
            h = nil if h.empty?
            yield m
          end
        else
          yield m
        end
      end
    end
  rescue => error
    @config[:logger].warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}"
    sleep 1
    retry
  end
end