Class: Redstream::Delayer

Inherits:
Object
  • Object
show all
Defined in:
lib/redstream/delayer.rb

Overview

The Redstream::Delayer class is responsible for reading messages from special delay streams which are used to fix inconsistencies resulting from network or other issues in between after_save and after_commit callbacks. To be able to fix such issues, delay messages will be added to a delay stream within an after_save callback. The delay messages aren’t fetched immediately, but e.g. 5 minutes later, such that we can be sure that the database transaction is committed or has been rolled back, but is no longer running.

Examples:

Redstream::Delayer.new(stream_name: "users", delay: 5.minutes, logger: Logger.new(STDOUT)).run

Instance Method Summary collapse

Constructor Details

#initialize(stream_name:, delay:, logger: Logger.new("/dev/null")) ⇒ Delayer

Initializes the delayer for the specified stream name and delay.

Parameters:

  • stream_name (String)

    The stream name. Please note, that redstream adds a prefix to the redis keys. However, the stream_name param must be specified without any prefixes here. When using Redstream::Model, the stream name is the downcased, pluralized and underscored version of the model name. I.e., the stream name for a ‘User’ model will be ‘users’

  • delay (Fixnum, Float, ActiveSupport::Duration)

    The delay, i.e. the age a message must have before processing it.

  • logger (Logger) (defaults to: Logger.new("/dev/null"))

    The logger used for logging debug and error messages to.



28
29
30
31
32
33
34
35
# File 'lib/redstream/delayer.rb', line 28

def initialize(stream_name:, delay:, logger: Logger.new("/dev/null"))
  @stream_name = stream_name
  @delay = delay
  @logger = logger

  @consumer = Consumer.new(name: "delayer", stream_name: "#{stream_name}.delay", logger: logger)
  @batch = []
end

Instance Method Details

#runObject

Loops and blocks forever processing delay messages read from a delay stream.



40
41
42
# File 'lib/redstream/delayer.rb', line 40

def run
  loop { run_once }
end

#run_onceObject

Reads and processes a single batch of delay messages from a delay stream. You usually want to use the #run method instead, which loops/blocks forever.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/redstream/delayer.rb', line 48

def run_once
  @consumer.run_once do |messages|
    messages.each do |message|
      seconds_to_sleep = (message.message_id.to_f / 1_000) + @delay.to_f - Time.now.to_f

      if seconds_to_sleep > 0
        if @batch.size > 0
          id = @batch.last.message_id

          deliver

          @consumer.commit id
        end

        sleep(seconds_to_sleep + 1)
      end

      @batch << message
    end

    deliver
  end
rescue StandardError => e
  @logger.error e

  sleep 5

  retry
end