Class: MessageBus::Backends::Base Abstract

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/backends/base.rb

Overview

This class is abstract.

Backends provide a consistent API over a variety of options for persisting published messages. The API they present is around the publication to and reading of messages from those backlogs in a manner consistent with message_bus’ philosophy.

The heart of the message bus, a backend acts as two things:

  1. A channel multiplexer

  2. Backlog storage per-multiplexed channel.

Backends manage and expose multiple backlogs:

  • A backlog for each channel, in which messages that were published to that channel are stored.

  • A global backlog, which conceptually stores all published messages, regardless of the channel to which they were published.

Backlog storage mechanisms and schemas are up to each individual backend implementation, and some backends store messages very differently than others. It is not necessary in order to be considered a valid backend, to, for example, store each channel backlog as a separate collection. As long as the API is presented per this documentation, the backend is free to make its own storage and performance optimisations.

The concept of a per-channel backlog permits for lookups of messages in a manner that is optimised for the use case of a subscriber catching up from a message pointer, while a global backlog allows for optimising the case where another system subscribes to the firehose of messages, for example a message_bus server receiving all publications for delivery to subscribed clients.

Backends are fully responsible for maintaining their storage, including any pruning or expiration of that storage that is necessary. message_bus allows for several options for limiting the required storage capacity by either backlog size or the TTL of messages in a backlog. Backends take these settings and effect them either forcibly or by delegating to their storage mechanism.

Message which are published to message_bus have two IDs; one which they are known by in the channel-specific backlog that they are published to, and another (the “global ID”) which is unique across all channels and by which the message can be found in the global backlog. IDs are all sequential integers starting at 0.

Direct Known Subclasses

Memory, Postgres, Redis

Constant Summary collapse

ConcreteClassMustImplementError =

Raised to indicate that the concrete backend implementation does not implement part of the API

Class.new(StandardError)
UNSUB_MESSAGE =

Returns a special message published to trigger termination of backend subscriptions.

Returns:

  • (String)

    a special message published to trigger termination of backend subscriptions

"$$UNSUBSCRIBE"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}, max_backlog_size = 1000) ⇒ Base

Returns a new instance of Base.

Parameters:

  • config (Hash) (defaults to: {})

    backend-specific configuration options; see the concrete class for details

  • max_backlog_size (Integer) (defaults to: 1000)

    the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.



75
# File 'lib/message_bus/backends/base.rb', line 75

def initialize(config = {}, max_backlog_size = 1000); end

Instance Attribute Details

#clear_everyInteger

Typically, backlogs are trimmed whenever we publish to them. This setting allows some tolerance in order to improve performance.

Returns:

  • (Integer)

    the interval of publications between which the backlog will not be cleared.



69
70
71
# File 'lib/message_bus/backends/base.rb', line 69

def clear_every
  @clear_every
end

#max_backlog_ageInteger

Returns the longest amount of time a message may live in a backlog before being removed, in seconds.

Returns:

  • (Integer)

    the longest amount of time a message may live in a backlog before being removed, in seconds.



66
67
68
# File 'lib/message_bus/backends/base.rb', line 66

def max_backlog_age
  @max_backlog_age
end

#max_backlog_sizeInteger

Returns the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.

Returns:

  • (Integer)

    the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.



62
63
64
# File 'lib/message_bus/backends/base.rb', line 62

def max_backlog_size
  @max_backlog_size
end

#max_global_backlog_sizeInteger

Returns the largest permitted size (number of messages) for the global backlog; beyond this capacity, old messages will be dropped.

Returns:

  • (Integer)

    the largest permitted size (number of messages) for the global backlog; beyond this capacity, old messages will be dropped.



64
65
66
# File 'lib/message_bus/backends/base.rb', line 64

def max_global_backlog_size
  @max_global_backlog_size
end

#max_in_memory_publish_backlogInteger

Returns the largest permitted size (number of messages) to be held in a memory buffer when publication fails, for later re-publication.

Returns:

  • (Integer)

    the largest permitted size (number of messages) to be held in a memory buffer when publication fails, for later re-publication.



71
72
73
# File 'lib/message_bus/backends/base.rb', line 71

def max_in_memory_publish_backlog
  @max_in_memory_publish_backlog
end

#subscribedBoolean (readonly)

Returns The subscription state of the backend.

Returns:

  • (Boolean)

    The subscription state of the backend



60
61
62
# File 'lib/message_bus/backends/base.rb', line 60

def subscribed
  @subscribed
end

Instance Method Details

#after_forkObject

Performs routines specific to the backend that are necessary after a process fork, typically triggered by a forking webserver. Typically this re-opens sockets to the backend.



78
79
80
# File 'lib/message_bus/backends/base.rb', line 78

def after_fork
  raise ConcreteClassMustImplementError
end

#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>

Get messages from a channel backlog

Parameters:

  • channel (String)

    the name of the channel in question

  • last_id (#to_i) (defaults to: 0)

    the channel-specific ID of the last message that the caller received on the specified channel

Returns:

  • (Array<MessageBus::Message>)

    all messages published to the specified channel since the specified last ID

Raises:



136
137
138
# File 'lib/message_bus/backends/base.rb', line 136

def backlog(channel, last_id = 0)
  raise ConcreteClassMustImplementError
end

#destroyObject

Closes all open connections to the storage.



88
89
90
# File 'lib/message_bus/backends/base.rb', line 88

def destroy
  raise ConcreteClassMustImplementError
end

#expire_all_backlogs!Object

This method is abstract.

Deletes all backlogs and their data. Does not delete non-backlog data that message_bus may persist, depending on the concrete backend implementation. Use with extreme caution.



94
95
96
# File 'lib/message_bus/backends/base.rb', line 94

def expire_all_backlogs!
  raise ConcreteClassMustImplementError
end

#get_message(channel, message_id) ⇒ MessageBus::Message?

Get a specific message from a channel

Parameters:

  • channel (String)

    the name of the channel in question

  • message_id (Integer)

    the channel-specific ID of the message required

Returns:

Raises:



155
156
157
# File 'lib/message_bus/backends/base.rb', line 155

def get_message(channel, message_id)
  raise ConcreteClassMustImplementError
end

#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>

Get messages from the global backlog

Parameters:

  • last_id (#to_i) (defaults to: 0)

    the global ID of the last message that the caller received

Returns:

  • (Array<MessageBus::Message>)

    all messages published on any channel since the specified last ID

Raises:



145
146
147
# File 'lib/message_bus/backends/base.rb', line 145

def global_backlog(last_id = 0)
  raise ConcreteClassMustImplementError
end

#global_subscribe(last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • last_id (#to_i) (defaults to: nil)

    the global ID of the last message that the caller received

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:



189
190
191
# File 'lib/message_bus/backends/base.rb', line 189

def global_subscribe(last_id = nil)
  raise ConcreteClassMustImplementError
end

#global_unsubscribeObject

Causes all subscribers to the bus to unsubscribe, and terminates the local connection. Typically used to reset tests.



175
176
177
# File 'lib/message_bus/backends/base.rb', line 175

def global_unsubscribe
  raise ConcreteClassMustImplementError
end

#last_id(channel) ⇒ Integer

Get the ID of the last message published on a channel

Parameters:

  • channel (String)

    the name of the channel in question

Returns:

  • (Integer)

    the channel-specific ID of the last message published to the given channel

Raises:



117
118
119
# File 'lib/message_bus/backends/base.rb', line 117

def last_id(channel)
  raise ConcreteClassMustImplementError
end

#last_ids(*channels) ⇒ Array<Integer>

Get the ID of the last message published on multiple channels

Parameters:

  • channels (Array<String>)
    • array of channels to fetch

Returns:

  • (Array<Integer>)

    the channel-specific IDs of the last message published to each requested channel

Raises:



126
127
128
# File 'lib/message_bus/backends/base.rb', line 126

def last_ids(*channels)
  raise ConcreteClassMustImplementError
end

#publish(channel, data, opts = nil) ⇒ Integer

Publishes a message to a channel

Parameters:

  • channel (String)

    the name of the channel to which the message should be published

  • data (JSON)

    some data to publish to the channel. Must be an object that can be encoded as JSON

  • opts (Hash) (defaults to: nil)

Options Hash (opts):

  • :queue_in_memory (Boolean) — default: true

    whether or not to hold the message in an in-memory buffer if publication fails, to be re-tried later

  • :max_backlog_age (Integer) — default: `self.max_backlog_age`

    the longest amount of time a message may live in a backlog before being removed, in seconds

  • :max_backlog_size (Integer) — default: `self.max_backlog_size`

    the largest permitted size (number of messages) for the channel backlog; beyond this capacity, old messages will be dropped

Returns:

  • (Integer)

    the channel-specific ID the message was given

Raises:



108
109
110
# File 'lib/message_bus/backends/base.rb', line 108

def publish(channel, data, opts = nil)
  raise ConcreteClassMustImplementError
end

#reset!Object

Deletes all message_bus data from the backend. Use with extreme caution.



83
84
85
# File 'lib/message_bus/backends/base.rb', line 83

def reset!
  raise ConcreteClassMustImplementError
end

#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • channel (String)

    the name of the channel to which we should subscribe

  • last_id (#to_i) (defaults to: nil)

    the channel-specific ID of the last message that the caller received on the specified channel

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:



170
171
172
# File 'lib/message_bus/backends/base.rb', line 170

def subscribe(channel, last_id = nil)
  raise ConcreteClassMustImplementError
end