Class: MessageBus::Backends::Postgres

Inherits:
Base
  • Object
show all
Defined in:
lib/message_bus/backends/postgres.rb

Overview

Note:

This backend diverges from the standard in Base in the following ways:

  • Does not support in-memory buffering of messages on publication

  • Does not expire backlogs until they are published to

The Postgres backend stores published messages in a single Postgres table with only global IDs, and an index on channel name and ID for fast per-channel lookup. All queries are implemented as prepared statements to reduce the wire-chatter during use. In addition to storage in the table, messages are published using ‘pg_notify`; this is used for actively subscribed message_bus servers to consume published messages in real-time while connected and forward them to subscribers, while catch-up is performed from the backlog table.

Defined Under Namespace

Classes: Client

Constant Summary

Constants inherited from Base

Base::ConcreteClassMustImplementError, Base::UNSUB_MESSAGE

Instance Attribute Summary

Attributes inherited from Base

#clear_every, #max_backlog_age, #max_backlog_size, #max_global_backlog_size, #max_in_memory_publish_backlog, #subscribed

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}, max_backlog_size = 1000) ⇒ Postgres

Returns a new instance of Postgres.

Parameters:

  • config (Hash) (defaults to: {})
  • max_backlog_size (Integer) (defaults to: 1000)

    the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.

Options Hash (config):

  • :logger (Logger)

    a logger to which logs will be output

  • :clear_every (Integer)

    the interval of publications between which the backlog will not be cleared

  • :backend_options (Hash)

    see PG::Connection.connect for details of which options may be provided



274
275
276
277
278
279
280
281
282
283
# File 'lib/message_bus/backends/postgres.rb', line 274

def initialize(config = {}, max_backlog_size = 1000)
  @config = config
  @max_backlog_size = max_backlog_size
  @max_global_backlog_size = 2000
  # after 7 days inactive backlogs will be removed
  @max_backlog_age = 604800
  @clear_every = config[:clear_every] || 1
  @mutex = Mutex.new
  @client = nil
end

Class Method Details

.reset!(config) ⇒ Object



265
266
267
# File 'lib/message_bus/backends/postgres.rb', line 265

def self.reset!(config)
  MessageBus::Postgres::Client.new(config).reset!
end

Instance Method Details

#after_forkObject

Reconnects to Postgres; used after a process fork, typically triggered by a forking webserver

See Also:



287
288
289
# File 'lib/message_bus/backends/postgres.rb', line 287

def after_fork
  client.after_fork
end

#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>

Get messages from a channel backlog

Parameters:

  • channel (String)

    the name of the channel in question

  • last_id (#to_i) (defaults to: 0)

    the channel-specific ID of the last message that the caller received on the specified channel

Returns:

  • (Array<MessageBus::Message>)

    all messages published to the specified channel since the specified last ID



338
339
340
341
342
343
344
# File 'lib/message_bus/backends/postgres.rb', line 338

def backlog(channel, last_id = 0)
  items = client.backlog channel, last_id.to_i

  items.map! do |id, data|
    MessageBus::Message.new id, id, channel, data
  end
end

#destroyObject

Closes all open connections to the storage.



297
298
299
# File 'lib/message_bus/backends/postgres.rb', line 297

def destroy
  client.destroy
end

#expire_all_backlogs!Object

This method is abstract.

Deletes all backlogs and their data. Does not delete non-backlog data that message_bus may persist, depending on the concrete backend implementation. Use with extreme caution.



302
303
304
# File 'lib/message_bus/backends/postgres.rb', line 302

def expire_all_backlogs!
  client.expire_all_backlogs!
end

#get_message(channel, message_id) ⇒ MessageBus::Message?

Get a specific message from a channel

Parameters:

  • channel (String)

    the name of the channel in question

  • message_id (Integer)

    the channel-specific ID of the message required

Returns:



356
357
358
359
360
361
362
# File 'lib/message_bus/backends/postgres.rb', line 356

def get_message(channel, message_id)
  if data = client.get_value(channel, message_id)
    MessageBus::Message.new message_id, message_id, channel, data
  else
    nil
  end
end

#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>

Get messages from the global backlog

Parameters:

  • last_id (#to_i) (defaults to: 0)

    the global ID of the last message that the caller received

Returns:

  • (Array<MessageBus::Message>)

    all messages published on any channel since the specified last ID



347
348
349
350
351
352
353
# File 'lib/message_bus/backends/postgres.rb', line 347

def global_backlog(last_id = 0)
  items = client.global_backlog last_id.to_i

  items.map! do |id, channel, data|
    MessageBus::Message.new id, id, channel, data
  end
end

#global_subscribe(last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • last_id (#to_i) (defaults to: nil)

    the global ID of the last message that the caller received

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:

  • (ArgumentError)


382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/message_bus/backends/postgres.rb', line 382

def global_subscribe(last_id = nil)
  raise ArgumentError unless block_given?

  highest_id = last_id

  begin
    client.subscribe(postgresql_channel_name) do |on|
      h = {}

      on.subscribe do
        if highest_id
          process_global_backlog(highest_id) do |m|
            h[m.global_id] = true
            yield m
          end
        end
        h = nil if h.empty?
        @subscribed = true
      end

      on.unsubscribe do
        @subscribed = false
      end

      on.message do |_c, m|
        if m == UNSUB_MESSAGE
          @subscribed = false
          return
        end
        m = MessageBus::Message.decode m

        # we have 3 options
        #
        # 1. message came in the correct order GREAT, just deal with it
        # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
        # 3. message came in the incorrect order and is lowest than current highest id, reset

        if h
          # If already yielded during the clear backlog when subscribing,
          # don't yield a duplicate copy.
          unless h.delete(m.global_id)
            h = nil if h.empty?
            yield m
          end
        else
          yield m
        end
      end
    end
  rescue => error
    @config[:logger].warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}"
    sleep 1
    retry
  end
end

#global_unsubscribeObject

Causes all subscribers to the bus to unsubscribe, and terminates the local connection. Typically used to reset tests.



376
377
378
379
# File 'lib/message_bus/backends/postgres.rb', line 376

def global_unsubscribe
  client.publish(postgresql_channel_name, UNSUB_MESSAGE)
  @subscribed = false
end

#last_id(channel) ⇒ Integer

Get the ID of the last message published on a channel

Parameters:

  • channel (String)

    the name of the channel in question

Returns:

  • (Integer)

    the channel-specific ID of the last message published to the given channel



328
329
330
# File 'lib/message_bus/backends/postgres.rb', line 328

def last_id(channel)
  client.max_id(channel)
end

#last_ids(*channels) ⇒ Array<Integer>

Get the ID of the last message published on multiple channels

Parameters:

  • channels (Array<String>)
    • array of channels to fetch

Returns:

  • (Array<Integer>)

    the channel-specific IDs of the last message published to each requested channel



333
334
335
# File 'lib/message_bus/backends/postgres.rb', line 333

def last_ids(*channels)
  client.max_ids(*channels)
end

#publish(channel, data, opts = nil) ⇒ Integer

TODO:

:queue_in_memory NOT SUPPORTED

Publishes a message to a channel

Parameters:

  • channel (String)

    the name of the channel to which the message should be published

  • data (JSON)

    some data to publish to the channel. Must be an object that can be encoded as JSON

  • opts (Hash) (defaults to: nil)

Options Hash (opts):

  • :queue_in_memory (Boolean) — default: true

    whether or not to hold the message in an in-memory buffer if publication fails, to be re-tried later

  • :max_backlog_age (Integer) — default: `self.max_backlog_age`

    the longest amount of time a message may live in a backlog before being removed, in seconds

  • :max_backlog_size (Integer) — default: `self.max_backlog_size`

    the largest permitted size (number of messages) for the channel backlog; beyond this capacity, old messages will be dropped

Returns:

  • (Integer)

    the channel-specific ID the message was given



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/message_bus/backends/postgres.rb', line 308

def publish(channel, data, opts = nil)
  # TODO in memory queue?

  c = client
  backlog_id = c.add(channel, data)
  msg = MessageBus::Message.new backlog_id, backlog_id, channel, data
  payload = msg.encode
  c.publish postgresql_channel_name, payload
  if backlog_id && backlog_id % clear_every == 0
    max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size
    max_backlog_age = (opts && opts[:max_backlog_age]) || self.max_backlog_age
    c.clear_global_backlog(backlog_id, @max_global_backlog_size)
    c.expire(max_backlog_age)
    c.clear_channel_backlog(channel, backlog_id, max_backlog_size)
  end

  backlog_id
end

#reset!Object

Deletes all message_bus data from the backend. Use with extreme caution.



292
293
294
# File 'lib/message_bus/backends/postgres.rb', line 292

def reset!
  client.reset!
end

#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • channel (String)

    the name of the channel to which we should subscribe

  • last_id (#to_i) (defaults to: nil)

    the channel-specific ID of the last message that the caller received on the specified channel

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:

  • (ArgumentError)


365
366
367
368
369
370
371
372
373
# File 'lib/message_bus/backends/postgres.rb', line 365

def subscribe(channel, last_id = nil)
  # trivial implementation for now,
  #   can cut down on connections if we only have one global subscriber
  raise ArgumentError unless block_given?

  global_subscribe(last_id) do |m|
    yield m if m.channel == channel
  end
end