Class: Redstream::Delayer
- Inherits:
-
Object
- Object
- Redstream::Delayer
- Defined in:
- lib/redstream/delayer.rb
Overview
The Redstream::Delayer class is responsible for reading messages from special delay streams which are used to fix inconsistencies resulting from network or other issues in between after_save and after_commit callbacks. To be able to fix such issues, delay messages will be added to a delay stream within an after_save callback. The delay messages aren’t fetched immediately, but e.g. 5 minutes later, such that we can be sure that the database transaction is committed or has been rolled back, but is no longer running.
Instance Method Summary collapse
-
#initialize(stream_name:, delay:, logger: Logger.new("/dev/null")) ⇒ Delayer
constructor
Initializes the delayer for the specified stream name and delay.
-
#run ⇒ Object
Loops and blocks forever processing delay messages read from a delay stream.
-
#run_once ⇒ Object
Reads and processes a single batch of delay messages from a delay stream.
Constructor Details
#initialize(stream_name:, delay:, logger: Logger.new("/dev/null")) ⇒ Delayer
Initializes the delayer for the specified stream name and delay.
28 29 30 31 32 33 34 35 |
# File 'lib/redstream/delayer.rb', line 28 def initialize(stream_name:, delay:, logger: Logger.new("/dev/null")) @stream_name = stream_name @delay = delay @logger = logger @consumer = Consumer.new(name: "delayer", stream_name: "#{stream_name}.delay", logger: logger) @batch = [] end |
Instance Method Details
#run ⇒ Object
Loops and blocks forever processing delay messages read from a delay stream.
40 41 42 |
# File 'lib/redstream/delayer.rb', line 40 def run loop { run_once } end |
#run_once ⇒ Object
Reads and processes a single batch of delay messages from a delay stream. You usually want to use the #run method instead, which loops/blocks forever.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/redstream/delayer.rb', line 48 def run_once @consumer.run_once do || .each do || seconds_to_sleep = (..to_f / 1_000) + @delay.to_f - Time.now.to_f if seconds_to_sleep > 0 if @batch.size > 0 id = @batch.last. deliver @consumer.commit id end sleep(seconds_to_sleep + 1) end @batch << end deliver end rescue StandardError => e @logger.error e sleep 5 retry end |