Class: MessageBus::Backends::Postgres

Inherits:
Base
  • Object
show all
Defined in:
lib/message_bus/backends/postgres.rb

Overview

Note:

This backend diverges from the standard in Base in the following ways:

  • Does not support in-memory buffering of messages on publication

  • Does not expire backlogs until they are published to

The Postgres backend stores published messages in a single Postgres table with only global IDs, and an index on channel name and ID for fast per-channel lookup. All queries are implemented as prepared statements to reduce the wire-chatter during use. In addition to storage in the table, messages are published using ‘pg_notify`; this is used for actively subscribed message_bus servers to consume published messages in real-time while connected and forward them to subscribers, while catch-up is performed from the backlog table.

Defined Under Namespace

Classes: Client

Constant Summary

Constants inherited from Base

Base::ConcreteClassMustImplementError, Base::UNSUB_MESSAGE

Instance Attribute Summary

Attributes inherited from Base

#clear_every, #max_backlog_age, #max_backlog_size, #max_global_backlog_size, #max_in_memory_publish_backlog, #subscribed

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}, max_backlog_size = 1000) ⇒ Postgres

Returns a new instance of Postgres.

Parameters:

  • config (Hash) (defaults to: {})
  • max_backlog_size (Integer) (defaults to: 1000)

    the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.

Options Hash (config):

  • :logger (Logger)

    a logger to which logs will be output

  • :clear_every (Integer)

    the interval of publications between which the backlog will not be cleared

  • :backend_options (Hash)

    see PG::Connection.connect for details of which options may be provided



254
255
256
257
258
259
260
261
262
# File 'lib/message_bus/backends/postgres.rb', line 254

def initialize(config = {}, max_backlog_size = 1000)
  @config = config
  @max_backlog_size = max_backlog_size
  @max_global_backlog_size = 2000
  # after 7 days inactive backlogs will be removed
  @max_backlog_age = 604800
  @clear_every = config[:clear_every] || 1
  @mutex = Mutex.new
end

Class Method Details

.reset!(config) ⇒ Object



245
246
247
# File 'lib/message_bus/backends/postgres.rb', line 245

def self.reset!(config)
  MessageBus::Postgres::Client.new(config).reset!
end

Instance Method Details

#after_forkObject

Reconnects to Postgres; used after a process fork, typically triggered by a forking webserver

See Also:



266
267
268
# File 'lib/message_bus/backends/postgres.rb', line 266

def after_fork
  client.after_fork
end

#backlog(channel, last_id = 0) ⇒ Integer

Get the ID of the last message published on a channel

Parameters:

  • channel (String)

    the name of the channel in question

Returns:

  • (Integer)

    the channel-specific ID of the last message published to the given channel



312
313
314
315
316
317
318
# File 'lib/message_bus/backends/postgres.rb', line 312

def backlog(channel, last_id = 0)
  items = client.backlog channel, last_id.to_i

  items.map! do |id, data|
    MessageBus::Message.new id, id, channel, data
  end
end

#destroyObject

Closes all open connections to the storage.



276
277
278
# File 'lib/message_bus/backends/postgres.rb', line 276

def destroy
  client.destroy
end

#expire_all_backlogs!Object

This method is abstract.

Deletes all backlogs and their data. Does not delete non-backlog data that message_bus may persist, depending on the concrete backend implementation. Use with extreme caution.



281
282
283
# File 'lib/message_bus/backends/postgres.rb', line 281

def expire_all_backlogs!
  client.expire_all_backlogs!
end

#get_message(channel, message_id) ⇒ MessageBus::Message?

Get a specific message from a channel

Parameters:

  • channel (String)

    the name of the channel in question

  • message_id (Integer)

    the channel-specific ID of the message required

Returns:



330
331
332
333
334
335
336
# File 'lib/message_bus/backends/postgres.rb', line 330

def get_message(channel, message_id)
  if data = client.get_value(channel, message_id)
    MessageBus::Message.new message_id, message_id, channel, data
  else
    nil
  end
end

#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>

Get messages from the global backlog

Parameters:

  • last_id (#to_i) (defaults to: 0)

    the global ID of the last message that the caller received

Returns:

  • (Array<MessageBus::Message>)

    all messages published on any channel since the specified last ID



321
322
323
324
325
326
327
# File 'lib/message_bus/backends/postgres.rb', line 321

def global_backlog(last_id = 0)
  items = client.global_backlog last_id.to_i

  items.map! do |id, channel, data|
    MessageBus::Message.new id, id, channel, data
  end
end

#global_subscribe(last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • last_id (#to_i) (defaults to: nil)

    the global ID of the last message that the caller received

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:

  • (ArgumentError)


356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/message_bus/backends/postgres.rb', line 356

def global_subscribe(last_id = nil)
  raise ArgumentError unless block_given?

  highest_id = last_id

  begin
    client.subscribe(postgresql_channel_name) do |on|
      h = {}

      on.subscribe do
        if highest_id
          process_global_backlog(highest_id) do |m|
            h[m.global_id] = true
            yield m
          end
        end
        h = nil if h.empty?
        @subscribed = true
      end

      on.unsubscribe do
        @subscribed = false
      end

      on.message do |_c, m|
        if m == UNSUB_MESSAGE
          @subscribed = false
          return
        end
        m = MessageBus::Message.decode m

        # we have 3 options
        #
        # 1. message came in the correct order GREAT, just deal with it
        # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
        # 3. message came in the incorrect order and is lowest than current highest id, reset

        if h
          # If already yielded during the clear backlog when subscribing,
          # don't yield a duplicate copy.
          unless h.delete(m.global_id)
            h = nil if h.empty?
            yield m
          end
        else
          yield m
        end
      end
    end
  rescue => error
    @config[:logger].warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}"
    sleep 1
    retry
  end
end

#global_unsubscribeObject

Causes all subscribers to the bus to unsubscribe, and terminates the local connection. Typically used to reset tests.



350
351
352
353
# File 'lib/message_bus/backends/postgres.rb', line 350

def global_unsubscribe
  client.publish(postgresql_channel_name, UNSUB_MESSAGE)
  @subscribed = false
end

#last_id(channel) ⇒ Integer

Get the ID of the last message published on a channel

Parameters:

  • channel (String)

    the name of the channel in question

Returns:

  • (Integer)

    the channel-specific ID of the last message published to the given channel



307
308
309
# File 'lib/message_bus/backends/postgres.rb', line 307

def last_id(channel)
  client.max_id(channel)
end

#publish(channel, data, opts = nil) ⇒ Integer

TODO:

:queue_in_memory NOT SUPPORTED

Publishes a message to a channel

Parameters:

  • channel (String)

    the name of the channel to which the message should be published

  • data (JSON)

    some data to publish to the channel. Must be an object that can be encoded as JSON

  • opts (Hash) (defaults to: nil)

Options Hash (opts):

  • :queue_in_memory (Boolean) — default: true

    whether or not to hold the message in an in-memory buffer if publication fails, to be re-tried later

  • :max_backlog_age (Integer) — default: `self.max_backlog_age`

    the longest amount of time a message may live in a backlog before being removed, in seconds

  • :max_backlog_size (Integer) — default: `self.max_backlog_size`

    the largest permitted size (number of messages) for the channel backlog; beyond this capacity, old messages will be dropped

Returns:

  • (Integer)

    the channel-specific ID the message was given



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/message_bus/backends/postgres.rb', line 287

def publish(channel, data, opts = nil)
  # TODO in memory queue?

  c = client
  backlog_id = c.add(channel, data)
  msg = MessageBus::Message.new backlog_id, backlog_id, channel, data
  payload = msg.encode
  c.publish postgresql_channel_name, payload
  if backlog_id && backlog_id % clear_every == 0
    max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size
    max_backlog_age = (opts && opts[:max_backlog_age]) || self.max_backlog_age
    c.clear_global_backlog(backlog_id, @max_global_backlog_size)
    c.expire(max_backlog_age)
    c.clear_channel_backlog(channel, backlog_id, max_backlog_size)
  end

  backlog_id
end

#reset!Object

Deletes all message_bus data from the backend. Use with extreme caution.



271
272
273
# File 'lib/message_bus/backends/postgres.rb', line 271

def reset!
  client.reset!
end

#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • channel (String)

    the name of the channel to which we should subscribe

  • last_id (#to_i) (defaults to: nil)

    the channel-specific ID of the last message that the caller received on the specified channel

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:

  • (ArgumentError)


339
340
341
342
343
344
345
346
347
# File 'lib/message_bus/backends/postgres.rb', line 339

def subscribe(channel, last_id = nil)
  # trivial implementation for now,
  #   can cut down on connections if we only have one global subscriber
  raise ArgumentError unless block_given?

  global_subscribe(last_id) do |m|
    yield m if m.channel == channel
  end
end