Class: Redstream::Consumer
- Inherits:
-
Object
- Object
- Redstream::Consumer
- Defined in:
- lib/redstream/consumer.rb
Overview
The Redstream::Consumer class to read messages from a specified redis stream in batches.
Instance Method Summary collapse
-
#commit(offset) ⇒ Object
private
Commits the specified offset/ID as the maximum ID already read, such that subsequent read calls will use this offset/ID as a starting point.
-
#initialize(name:, stream_name:, batch_size: 1_000, logger: Logger.new("/dev/null")) ⇒ Consumer
constructor
Initializes a new consumer instance.
-
#max_committed_id ⇒ String?
Returns its maximum committed id, i.e.
-
#run(&block) ⇒ Object
Loops and thus blocks forever while reading messages from the specified stream and yielding them in batches.
-
#run_once(&block) ⇒ Object
Reads a single batch from the specified stream and yields it.
Constructor Details
#initialize(name:, stream_name:, batch_size: 1_000, logger: Logger.new("/dev/null")) ⇒ Consumer
Initializes a new consumer instance. Please note that you can have multiple consumers per stream, by specifying different names.
28 29 30 31 32 33 34 35 |
# File 'lib/redstream/consumer.rb', line 28 def initialize(name:, stream_name:, batch_size: 1_000, logger: Logger.new("/dev/null")) @name = name @stream_name = stream_name @batch_size = batch_size @logger = logger @redis = Redstream.connection_pool.with(&:dup) @lock = Lock.new(name: "consumer:#{@stream_name}:#{@name}") end |
Instance Method Details
#commit(offset) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Commits the specified offset/ID as the maximum ID already read, such that subsequent read calls will use this offset/ID as a starting point.
109 110 111 |
# File 'lib/redstream/consumer.rb', line 109 def commit(offset) @redis.set Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name), offset end |
#max_committed_id ⇒ String?
Returns its maximum committed id, i.e. the consumer’s offset.
41 42 43 |
# File 'lib/redstream/consumer.rb', line 41 def max_committed_id @redis.get Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name) end |
#run(&block) ⇒ Object
Loops and thus blocks forever while reading messages from the specified stream and yielding them in batches.
53 54 55 |
# File 'lib/redstream/consumer.rb', line 53 def run(&block) loop { run_once(&block) } end |
#run_once(&block) ⇒ Object
Reads a single batch from the specified stream and yields it. You usually want to use the #run method instead, which loops/blocks forever.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/redstream/consumer.rb', line 65 def run_once(&block) got_lock = @lock.acquire do offset = @redis.get(Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name)) offset ||= "0-0" stream_key_name = Redstream.stream_key_name(@stream_name) response = begin @redis.xread(stream_key_name, offset, count: @batch_size, block: 5_000) rescue Redis::TimeoutError nil end return if response.nil? || response[stream_key_name].nil? || response[stream_key_name].empty? = response[stream_key_name].map do || Message.new() end block.call() offset = response[stream_key_name].last[0] return unless offset commit offset end sleep(5) unless got_lock rescue StandardError => e @logger.error e sleep 5 retry end |