Method: MessageBus::Backends::Postgres#publish

Defined in:
lib/message_bus/backends/postgres.rb

#publish(channel, data, opts = nil) ⇒ Integer

TODO:

:queue_in_memory NOT SUPPORTED

Publishes a message to a channel

Parameters:

  • channel (String)

    the name of the channel to which the message should be published

  • data (JSON)

    some data to publish to the channel. Must be an object that can be encoded as JSON

  • opts (Hash) (defaults to: nil)

Options Hash (opts):

  • :queue_in_memory (Boolean) — default: true

    whether or not to hold the message in an in-memory buffer if publication fails, to be re-tried later

  • :max_backlog_age (Integer) — default: `self.max_backlog_age`

    the longest amount of time a message may live in a backlog before being removed, in seconds

  • :max_backlog_size (Integer) — default: `self.max_backlog_size`

    the largest permitted size (number of messages) for the channel backlog; beyond this capacity, old messages will be dropped

Returns:

  • (Integer)

    the channel-specific ID the message was given



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/message_bus/backends/postgres.rb', line 288

def publish(channel, data, opts = nil)
  # TODO in memory queue?

  c = client
  backlog_id = c.add(channel, data)
  msg = MessageBus::Message.new backlog_id, backlog_id, channel, data
  payload = msg.encode
  c.publish postgresql_channel_name, payload
  if backlog_id && backlog_id % clear_every == 0
    max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size
    max_backlog_age = (opts && opts[:max_backlog_age]) || self.max_backlog_age
    c.clear_global_backlog(backlog_id, @max_global_backlog_size)
    c.expire(max_backlog_age)
    c.clear_channel_backlog(channel, backlog_id, max_backlog_size)
  end

  backlog_id
end