Class: MessageBus::Backends::Redis

Inherits:
Base
  • Object
show all
Defined in:
lib/message_bus/backends/redis.rb

Overview

Note:

This backend diverges from the standard in Base in the following ways:

  • ‘max_backlog_age` options in this backend differ from the behaviour of other backends, in that either no messages are removed (when publications happen more regularly than this time-frame) or all messages are removed (when no publication happens during this time-frame).

The Redis backend stores published messages in Redis sorted sets (using ZADD, where the score is the message ID), one for each channel (where the full message is stored), and also in a global backlog as a simple pointer to the respective channel and channel-specific ID. In addition, publication publishes full messages to a Redis PubSub channel; this is used for actively subscribed message_bus servers to consume published messages in real-time while connected and forward them to subscribers, while catch-up is performed from the backlog sorted sets.

Message lookup is performed using the Redis ZRANGEBYSCORE command, and backlog trimming uses ZREMRANGEBYSCORE. The last used channel-specific and global IDs are stored as integers in simple Redis keys and incremented on publication.

Publication is implemented using a Lua script to ensure that it is atomic and messages are not corrupted by parallel publication.

Defined Under Namespace

Classes: BackLogOutOfOrder

Constant Summary collapse

LUA_PUBLISH =

Note, the script takes care of all expiry of keys, however we do not expire the global backlog key cause we have no simple way to determine what it should be on publish we do not provide a mechanism to set a global max backlog age, only a per-channel which we can override on publish

<<LUA

      local start_payload = ARGV[1]
      local max_backlog_age = ARGV[2]
      local max_backlog_size = tonumber(ARGV[3])
      local max_global_backlog_size = tonumber(ARGV[4])
      local channel = ARGV[5]
      local clear_every = ARGV[6]

      local global_id_key = KEYS[1]
      local backlog_id_key = KEYS[2]
      local backlog_key = KEYS[3]
      local global_backlog_key = KEYS[4]
      local redis_channel_name = KEYS[5]

      local global_id = redis.call("INCR", global_id_key)
      local backlog_id = redis.call("INCR", backlog_id_key)
      local payload = table.concat({ global_id, backlog_id, start_payload }, "|")
      local global_backlog_message = table.concat({ backlog_id, channel }, "|")

      redis.call("ZADD", backlog_key, backlog_id, payload)
      redis.call("EXPIRE", backlog_key, max_backlog_age)
      redis.call("ZADD", global_backlog_key, global_id, global_backlog_message)
      redis.call("EXPIRE", global_backlog_key, max_backlog_age)
      redis.call("PUBLISH", redis_channel_name, payload)

      redis.call("EXPIRE", backlog_id_key, max_backlog_age)

      if backlog_id > max_backlog_size and backlog_id % clear_every == 0 then
        redis.call("ZREMRANGEBYSCORE", backlog_key, 1, backlog_id - max_backlog_size)
      end

      if global_id > max_global_backlog_size and global_id % clear_every == 0 then
        redis.call("ZREMRANGEBYSCORE", global_backlog_key, 1, global_id - max_global_backlog_size)
      end

      return backlog_id
LUA
LUA_PUBLISH_SHA1 =
Digest::SHA1.hexdigest(LUA_PUBLISH)

Constants inherited from Base

Base::ConcreteClassMustImplementError, Base::UNSUB_MESSAGE

Instance Attribute Summary

Attributes inherited from Base

#clear_every, #max_backlog_age, #max_backlog_size, #max_global_backlog_size, #max_in_memory_publish_backlog, #subscribed

Instance Method Summary collapse

Constructor Details

#initialize(redis_config = {}, max_backlog_size = 1000) ⇒ Redis

Returns a new instance of Redis.

Parameters:

  • redis_config (Hash) (defaults to: {})

    in addition to the options listed, see github.com/redis/redis-rb for other available options

  • max_backlog_size (Integer) (defaults to: 1000)

    the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.

Options Hash (redis_config):

  • :logger (Logger)

    a logger to which logs will be output

  • :enable_redis_logger (Boolean) — default: false

    whether or not to enable logging by the underlying Redis library

  • :clear_every (Integer) — default: 1

    the interval of publications between which the backlog will not be cleared



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/message_bus/backends/redis.rb', line 48

def initialize(redis_config = {}, max_backlog_size = 1000)
  @redis_config = redis_config.dup
  @clear_every = redis_config.delete(:clear_every) || 1
  @logger = @redis_config[:logger]
  @redis_config[:logger] = nil unless @redis_config[:enable_redis_logger]
  @max_backlog_size = max_backlog_size
  @max_global_backlog_size = 2000
  @max_in_memory_publish_backlog = 1000
  @in_memory_backlog = []
  @lock = Mutex.new
  @flush_backlog_thread = nil
  @pub_redis = nil
  @subscribed = false
  # after 7 days inactive backlogs will be removed
  @max_backlog_age = 604_800
end

Instance Method Details

#after_forkObject

Reconnects to Redis; used after a process fork, typically triggered by a forking webserver

See Also:



67
68
69
# File 'lib/message_bus/backends/redis.rb', line 67

def after_fork
  @pub_redis&.disconnect!
end

#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>

Get messages from a channel backlog

Parameters:

  • channel (String)

    the name of the channel in question

  • last_id (#to_i) (defaults to: 0)

    the channel-specific ID of the last message that the caller received on the specified channel

Returns:

  • (Array<MessageBus::Message>)

    all messages published to the specified channel since the specified last ID



203
204
205
206
207
208
209
# File 'lib/message_bus/backends/redis.rb', line 203

def backlog(channel, last_id = 0)
  redis = pub_redis
  backlog_key = backlog_key(channel)
  items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"

  items.map { |i| MessageBus::Message.decode(i) }
end

#destroyObject

Closes all open connections to the storage.



77
78
79
# File 'lib/message_bus/backends/redis.rb', line 77

def destroy
  @pub_redis&.disconnect!
end

#expire_all_backlogs!Object

Deletes all backlogs and their data. Does not delete ID pointers, so new publications will get IDs that continue from the last publication before the expiry. Use with extreme caution.



83
84
85
# File 'lib/message_bus/backends/redis.rb', line 83

def expire_all_backlogs!
  pub_redis.keys("__mb_*backlog_n").each { |k| pub_redis.del k }
end

#get_message(channel, message_id) ⇒ MessageBus::Message?

Get a specific message from a channel

Parameters:

  • channel (String)

    the name of the channel in question

  • message_id (Integer)

    the channel-specific ID of the message required

Returns:



228
229
230
231
232
233
234
235
236
237
238
# File 'lib/message_bus/backends/redis.rb', line 228

def get_message(channel, message_id)
  redis = pub_redis
  backlog_key = backlog_key(channel)

  items = redis.zrangebyscore backlog_key, message_id, message_id
  if items && items[0]
    MessageBus::Message.decode(items[0])
  else
    nil
  end
end

#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>

Get messages from the global backlog

Parameters:

  • last_id (#to_i) (defaults to: 0)

    the global ID of the last message that the caller received

Returns:

  • (Array<MessageBus::Message>)

    all messages published on any channel since the specified last ID



212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/message_bus/backends/redis.rb', line 212

def global_backlog(last_id = 0)
  items = pub_redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf"

  items.map! do |i|
    pipe = i.index "|"
    message_id = i[0..pipe].to_i
    channel = i[pipe + 1..-1]
    m = get_message(channel, message_id)
    m
  end

  items.compact!
  items
end

#global_subscribe(last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • last_id (#to_i) (defaults to: nil)

    the global ID of the last message that the caller received

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:

  • (ArgumentError)


268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/message_bus/backends/redis.rb', line 268

def global_subscribe(last_id = nil, &blk)
  raise ArgumentError unless block_given?

  highest_id = last_id

  clear_backlog =
    lambda do
      retries = 4
      begin
        highest_id = process_global_backlog(highest_id, retries > 0, &blk)
      rescue BackLogOutOfOrder => e
        highest_id = e.highest_id
        retries -= 1
        sleep(rand(50) / 1000.0)
        retry
      end
    end

  begin
    global_redis = new_redis_connection

    clear_backlog.call(&blk) if highest_id

    global_redis.subscribe(redis_channel_name) do |on|
      on.subscribe do
        clear_backlog.call(&blk) if highest_id
        @subscribed = true
      end

      on.unsubscribe { @subscribed = false }

      on.message do |_c, m|
        if m == UNSUB_MESSAGE
          @subscribed = false
          global_redis.unsubscribe
          return
        end
        m = MessageBus::Message.decode m

        # we have 3 options
        #
        # 1. message came in the correct order GREAT, just deal with it
        # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
        # 3. message came in the incorrect order and is lowest than current highest id, reset

        if highest_id.nil? || m.global_id == highest_id + 1
          highest_id = m.global_id
          yield m
        else
          clear_backlog.call(&blk)
        end
      end
    end
  rescue => error
    @logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace.join("\n")}"
    sleep 1
    global_redis&.disconnect!
    retry
  ensure
    global_redis&.disconnect!
  end
end

#global_unsubscribeObject

Causes all subscribers to the bus to unsubscribe, and terminates the local connection. Typically used to reset tests.



257
258
259
260
261
262
263
264
265
# File 'lib/message_bus/backends/redis.rb', line 257

def global_unsubscribe
  begin
    new_redis = new_redis_connection
    new_redis.publish(redis_channel_name, UNSUB_MESSAGE)
  ensure
    new_redis&.disconnect!
    @subscribed = false
  end
end

#last_id(channel) ⇒ Integer

Get the ID of the last message published on a channel

Parameters:

  • channel (String)

    the name of the channel in question

Returns:

  • (Integer)

    the channel-specific ID of the last message published to the given channel



190
191
192
193
# File 'lib/message_bus/backends/redis.rb', line 190

def last_id(channel)
  backlog_id_key = backlog_id_key(channel)
  pub_redis.get(backlog_id_key).to_i
end

#last_ids(*channels) ⇒ Array<Integer>

Get the ID of the last message published on multiple channels

Parameters:

  • channels (Array<String>)
    • array of channels to fetch

Returns:

  • (Array<Integer>)

    the channel-specific IDs of the last message published to each requested channel



196
197
198
199
200
# File 'lib/message_bus/backends/redis.rb', line 196

def last_ids(*channels)
  return [] if channels.size == 0
  backlog_id_keys = channels.map { |c| backlog_id_key(c) }
  pub_redis.mget(*backlog_id_keys).map(&:to_i)
end

#publish(channel, data, opts = nil) ⇒ Integer

Publishes a message to a channel

Parameters:

  • channel (String)

    the name of the channel to which the message should be published

  • data (JSON)

    some data to publish to the channel. Must be an object that can be encoded as JSON

  • opts (Hash) (defaults to: nil)

Options Hash (opts):

  • :queue_in_memory (Boolean) — default: true

    whether or not to hold the message in an in-memory buffer if publication fails, to be re-tried later

  • :max_backlog_age (Integer) — default: `self.max_backlog_age`

    the longest amount of time a message may live in a backlog before being removed, in seconds

  • :max_backlog_size (Integer) — default: `self.max_backlog_size`

    the largest permitted size (number of messages) for the channel backlog; beyond this capacity, old messages will be dropped

Returns:

  • (Integer)

    the channel-specific ID the message was given



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/message_bus/backends/redis.rb', line 132

def publish(channel, data, opts = nil)
  queue_in_memory = (opts && opts[:queue_in_memory]) != false

  max_backlog_age = (opts && opts[:max_backlog_age]) || self.max_backlog_age
  max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size

  redis = pub_redis
  backlog_id_key = backlog_id_key(channel)
  backlog_key = backlog_key(channel)

  msg = MessageBus::Message.new nil, nil, channel, data

  cached_eval(
    redis,
    LUA_PUBLISH,
    LUA_PUBLISH_SHA1,
    argv: [
      msg.encode_without_ids,
      max_backlog_age,
      max_backlog_size,
      max_global_backlog_size,
      channel,
      clear_every,
    ],
    keys: [
      global_id_key,
      backlog_id_key,
      backlog_key,
      global_backlog_key,
      redis_channel_name,
    ],
  )
rescue ::Redis::CommandError => e
  if queue_in_memory && e.message =~ /READONLY/
    @lock.synchronize do
      @in_memory_backlog << [channel, data]
      if @in_memory_backlog.length > @max_in_memory_publish_backlog
        @in_memory_backlog.delete_at(0)
        @logger.warn(
          "Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}",
        )
      end
    end

    if @flush_backlog_thread == nil
      @lock.synchronize do
        if @flush_backlog_thread == nil
          @flush_backlog_thread = Thread.new { ensure_backlog_flushed }
        end
      end
    end
    nil
  else
    raise
  end
end

#reset!Object

Deletes all message_bus data from the backend. Use with extreme caution.



72
73
74
# File 'lib/message_bus/backends/redis.rb', line 72

def reset!
  pub_redis.keys("__mb_*").each { |k| pub_redis.del k }
end

#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil

Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • channel (String)

    the name of the channel to which we should subscribe

  • last_id (#to_i) (defaults to: nil)

    the channel-specific ID of the last message that the caller received on the specified channel

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (nil)

Raises:

  • (ArgumentError)


241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/message_bus/backends/redis.rb', line 241

def subscribe(channel, last_id = nil)
  # trivial implementation for now,
  #   can cut down on connections if we only have one global subscriber
  raise ArgumentError unless block_given?

  if last_id
    # we need to translate this to a global id, at least give it a shot
    #   we are subscribing on global and global is always going to be bigger than local
    #   so worst case is a replay of a few messages
    message = get_message(channel, last_id)
    last_id = message.global_id if message
  end
  global_subscribe(last_id) { |m| yield m if m.channel == channel }
end