Class: Redstream::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/redstream/consumer.rb

Overview

The Redstream::Consumer class to read messages from a specified redis stream in batches.

Examples:

Redstream::Consumer.new(name: "user_indexer", stream_name: "users").run do |messages|
  # ...
end

Instance Method Summary collapse

Constructor Details

#initialize(name:, stream_name:, batch_size: 1_000, logger: Logger.new("/dev/null")) ⇒ Consumer

Initializes a new consumer instance. Please note that you can have multiple consumers per stream, by specifying different names.



28
29
30
31
32
33
34
35
# File 'lib/redstream/consumer.rb', line 28

def initialize(name:, stream_name:, batch_size: 1_000, logger: Logger.new("/dev/null"))
  @name = name
  @stream_name = stream_name
  @batch_size = batch_size
  @logger = logger
  @redis = Redstream.connection_pool.with(&:dup)
  @lock = Lock.new(name: "consumer:#{@stream_name}:#{@name}")
end

Instance Method Details

#commit(offset) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Commits the specified offset/ID as the maximum ID already read, such that subsequent read calls will use this offset/ID as a starting point.



109
110
111
# File 'lib/redstream/consumer.rb', line 109

def commit(offset)
  @redis.set Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name), offset
end

#max_committed_idString?

Returns its maximum committed id, i.e. the consumer’s offset.



41
42
43
# File 'lib/redstream/consumer.rb', line 41

def max_committed_id
  @redis.get Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name)
end

#run(&block) ⇒ Object

Loops and thus blocks forever while reading messages from the specified stream and yielding them in batches.

Examples:

consumer.run do |messages|
  # ...
end


53
54
55
# File 'lib/redstream/consumer.rb', line 53

def run(&block)
  loop { run_once(&block) }
end

#run_once(&block) ⇒ Object

Reads a single batch from the specified stream and yields it. You usually want to use the #run method instead, which loops/blocks forever.

Examples:

consumer.run_once do |messages|
  # ...
end


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/redstream/consumer.rb', line 65

def run_once(&block)
  got_lock = @lock.acquire do
    offset = @redis.get(Redstream.offset_key_name(stream_name: @stream_name, consumer_name: @name))
    offset ||= "0-0"

    stream_key_name = Redstream.stream_key_name(@stream_name)

    response = begin
      @redis.xread(stream_key_name, offset, count: @batch_size, block: 5_000)
    rescue Redis::TimeoutError
      nil
    end

    return if response.nil? || response[stream_key_name].nil? || response[stream_key_name].empty?

    messages = response[stream_key_name].map do |raw_message|
      Message.new(raw_message)
    end

    block.call(messages)

    offset = response[stream_key_name].last[0]

    return unless offset

    commit offset
  end

  sleep(5) unless got_lock
rescue StandardError => e
  @logger.error e

  sleep 5

  retry
end