Class: MessageBus::Backends::Memory

Inherits:
Base
  • Object
show all
Defined in:
lib/message_bus/backends/memory.rb

Overview

Note:

This backend diverges from the standard in Base in the following ways:

  • Does not support forking

  • Does not support in-memory buffering of messages on publication (redundant)

The memory backend stores published messages in a simple array per channel, and does not store a separate global backlog.

Defined Under Namespace

Classes: Client

Constant Summary

Constants inherited from Base

Base::ConcreteClassMustImplementError, Base::UNSUB_MESSAGE

Instance Attribute Summary

Attributes inherited from Base

#clear_every, #max_backlog_age, #max_backlog_size, #max_global_backlog_size, #max_in_memory_publish_backlog, #subscribed

Instance Method Summary collapse

Constructor Details

#initialize(config = {}, max_backlog_size = 1000) ⇒ Memory

Returns a new instance of Memory.

Parameters:

  • config (Hash) (defaults to: {})
  • max_backlog_size (Integer) (defaults to: 1000)

    the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.

Options Hash (config):

  • :logger (Logger)

    a logger to which logs will be output

  • :clear_every (Integer)

    the interval of publications between which the backlog will not be cleared



191
192
193
194
195
196
197
198
# File 'lib/message_bus/backends/memory.rb', line 191

def initialize(config = {}, max_backlog_size = 1000)
  @config = config
  @max_backlog_size = max_backlog_size
  @max_global_backlog_size = 2000
  # after 7 days inactive backlogs will be removed
  self.max_backlog_age = 604800
  @clear_every = config[:clear_every] || 1
end

Instance Method Details

#after_forkObject

No-op; this backend doesn’t support forking.

See Also:



206
207
208
# File 'lib/message_bus/backends/memory.rb', line 206

def after_fork
  nil
end

#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>

Get messages from a channel backlog

Parameters:

  • channel (String)

    the name of the channel in question

  • last_id (#to_i) (defaults to: 0)

    the channel-specific ID of the last message that the caller received on the specified channel

Returns:

  • (Array<MessageBus::Message>)

    all messages published to the specified channel since the specified last ID



255
256
257
258
259
260
261
# File 'lib/message_bus/backends/memory.rb', line 255

def backlog(channel, last_id = 0)
  items = client.backlog channel, last_id.to_i

  items.map! do |id, data|
    MessageBus::Message.new id, id, channel, data
  end
end

#destroyObject

No-op; this backend doesn’t maintain any storage connections. (see Base#destroy)



217
218
219
# File 'lib/message_bus/backends/memory.rb', line 217

def destroy
  nil
end

#expire_all_backlogs!Object

This method is abstract.

Deletes all backlogs and their data. Does not delete non-backlog data that message_bus may persist, depending on the concrete backend implementation. Use with extreme caution.



222
223
224
# File 'lib/message_bus/backends/memory.rb', line 222

def expire_all_backlogs!
  client.expire_all_backlogs!
end

#get_message(channel, message_id) ⇒ MessageBus::Message?

Get a specific message from a channel

Parameters:

  • channel (String)

    the name of the channel in question

  • message_id (Integer)

    the channel-specific ID of the message required

Returns:



273
274
275
276
277
278
279
# File 'lib/message_bus/backends/memory.rb', line 273

def get_message(channel, message_id)
  if data = client.get_value(channel, message_id)
    MessageBus::Message.new message_id, message_id, channel, data
  else
    nil
  end
end

#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>

Get messages from the global backlog

Parameters:

  • last_id (#to_i) (defaults to: 0)

    the global ID of the last message that the caller received

Returns:

  • (Array<MessageBus::Message>)

    all messages published on any channel since the specified last ID



264
265
266
267
268
269
270
# File 'lib/message_bus/backends/memory.rb', line 264

def global_backlog(last_id = 0)
  items = client.global_backlog last_id.to_i

  items.map! do |id, channel, data|
    MessageBus::Message.new id, id, channel, data
  end
end

#global_subscribe(last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • last_id (#to_i) (defaults to: nil)

    the global ID of the last message that the caller received

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:

  • (ArgumentError)


299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/message_bus/backends/memory.rb', line 299

def global_subscribe(last_id = nil)
  raise ArgumentError unless block_given?

  highest_id = last_id

  begin
    client.subscribe do |on|
      h = {}

      on.subscribe do
        if highest_id
          process_global_backlog(highest_id) do |m|
            h[m.global_id] = true
            yield m
          end
        end
        @subscribed = true
      end

      on.unsubscribe do
        @subscribed = false
      end

      on.message do |_c, m|
        m = MessageBus::Message.decode m

        # we have 3 options
        #
        # 1. message came in the correct order GREAT, just deal with it
        # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
        # 3. message came in the incorrect order and is lowest than current highest id, reset

        if h
          # If already yielded during the clear backlog when subscribing,
          # don't yield a duplicate copy.
          unless h.delete(m.global_id)
            h = nil if h.empty?
            yield m
          end
        else
          yield m
        end
      end
    end
  rescue => error
    @config[:logger].warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}"
    sleep 1
    retry
  end
end

#global_unsubscribeObject

Causes all subscribers to the bus to unsubscribe, and terminates the local connection. Typically used to reset tests.



293
294
295
296
# File 'lib/message_bus/backends/memory.rb', line 293

def global_unsubscribe
  client.unsubscribe
  @subscribed = false
end

#last_id(channel) ⇒ Integer

Get the ID of the last message published on a channel

Parameters:

  • channel (String)

    the name of the channel in question

Returns:

  • (Integer)

    the channel-specific ID of the last message published to the given channel



243
244
245
# File 'lib/message_bus/backends/memory.rb', line 243

def last_id(channel)
  client.max_id(channel)
end

#last_ids(*channels) ⇒ Array<Integer>

Get the ID of the last message published on multiple channels

Parameters:

  • channels (Array<String>)
    • array of channels to fetch

Returns:

  • (Array<Integer>)

    the channel-specific IDs of the last message published to each requested channel



248
249
250
251
252
# File 'lib/message_bus/backends/memory.rb', line 248

def last_ids(*channels)
  channels.map do |c|
    last_id(c)
  end
end

#max_backlog_age=(value) ⇒ Object



200
201
202
# File 'lib/message_bus/backends/memory.rb', line 200

def max_backlog_age=(value)
  client.max_backlog_age = value
end

#publish(channel, data, opts = nil) ⇒ Integer

TODO:

:queue_in_memory NOT SUPPORTED

Publishes a message to a channel

Parameters:

  • channel (String)

    the name of the channel to which the message should be published

  • data (JSON)

    some data to publish to the channel. Must be an object that can be encoded as JSON

  • opts (Hash) (defaults to: nil)

Options Hash (opts):

  • :queue_in_memory (Boolean) — default: true

    whether or not to hold the message in an in-memory buffer if publication fails, to be re-tried later

  • :max_backlog_age (Integer) — default: `self.max_backlog_age`

    the longest amount of time a message may live in a backlog before being removed, in seconds

  • :max_backlog_size (Integer) — default: `self.max_backlog_size`

    the largest permitted size (number of messages) for the channel backlog; beyond this capacity, old messages will be dropped

Returns:

  • (Integer)

    the channel-specific ID the message was given



228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/message_bus/backends/memory.rb', line 228

def publish(channel, data, opts = nil)
  c = client
  max_backlog_age = opts && opts[:max_backlog_age]
  backlog_id = c.add(channel, data, max_backlog_age: max_backlog_age)

  if backlog_id % clear_every == 0
    max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size
    c.clear_global_backlog(backlog_id, @max_global_backlog_size)
    c.clear_channel_backlog(channel, backlog_id, max_backlog_size)
  end

  backlog_id
end

#reset!Object

Deletes all message_bus data from the backend. Use with extreme caution.



211
212
213
# File 'lib/message_bus/backends/memory.rb', line 211

def reset!
  client.reset!
end

#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • channel (String)

    the name of the channel to which we should subscribe

  • last_id (#to_i) (defaults to: nil)

    the channel-specific ID of the last message that the caller received on the specified channel

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:

  • (ArgumentError)


282
283
284
285
286
287
288
289
290
# File 'lib/message_bus/backends/memory.rb', line 282

def subscribe(channel, last_id = nil)
  # trivial implementation for now,
  #   can cut down on connections if we only have one global subscriber
  raise ArgumentError unless block_given?

  global_subscribe(last_id) do |m|
    yield m if m.channel == channel
  end
end