Class: Redstream::Producer
- Inherits:
-
Object
- Object
- Redstream::Producer
- Includes:
- MonitorMixin
- Defined in:
- lib/redstream/producer.rb
Overview
A Redstream::Producer is responsible for writing the actual messages to redis. This includes the delay messages as well as the messages for immediate retrieval. Usually, you don’t have to use a producer directly. Instead, Redstream::Model handles all producer related interaction. However, Redstream::Model is not able to recognize model updates resulting from model updates via e.g. #update_all, #delete_all, etc, i.e. updates which by-pass model callbacks. Thus, calls to e.g. #update_all must be wrapped with ‘find_in_batches` and Redstream::Producer#bulk (see example), to write these updates to the redis streams as well.
Instance Method Summary collapse
-
#bulk(records) ⇒ Object
Use to wrap calls to #update_all, #delete_all, etc.
-
#bulk_delay(records) ⇒ Object
private
Writes delay messages to a delay stream in redis.
-
#bulk_delete(records, ids) ⇒ Object
private
Deletes delay message from a delay stream in redis.
-
#bulk_queue(records) ⇒ Object
private
Writes messages to a stream in redis for immediate retrieval.
-
#delay(object) ⇒ Object
private
Writes a single delay message to a delay stream in redis.
-
#delete(object, id) ⇒ Object
private
Deletes a single delay message from a delay stream in redis.
-
#initialize(wait: false) ⇒ Producer
constructor
Initializes a new producer.
-
#queue(object) ⇒ Object
private
Writes a single message to a stream in redis for immediate retrieval.
Constructor Details
#initialize(wait: false) ⇒ Producer
Initializes a new producer. In case you’re using a distributed redis setup, you can use redis WAIT to improve real world data safety via the wait param.
32 33 34 35 36 37 |
# File 'lib/redstream/producer.rb', line 32 def initialize(wait: false) @wait = wait @stream_name_cache = {} super() end |
Instance Method Details
#bulk(records) ⇒ Object
Use to wrap calls to #update_all, #delete_all, etc. I.e. methods, which by-pass model lifecycle callbacks (after_save, etc.), as Redstream::Model can’t recognize these updates and write them to redis streams automatically. You need to pass the records to be updated to the bulk method. The bulk method writes delay messages for the records to kafka, then yields and the writes the message for immediate retrieval. The method must ensure that the same set of records is used for the delay messages and the instant messages. Thus, you optimally, pass an array of records to it. If you pass an ActiveRecord::Relation, the method converts it to an array, i.e. loading all matching records into memory.
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/redstream/producer.rb', line 52 def bulk(records) records_array = Array(records) = bulk_delay(records_array) yield bulk_queue(records_array) bulk_delete(records_array, ) end |
#bulk_delay(records) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes delay messages to a delay stream in redis.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/redstream/producer.rb', line 71 def bulk_delay(records) res = records.each_slice(250).flat_map do |slice| Redstream.connection_pool.with do |redis| redis.pipelined do slice.each do |object| redis.xadd Redstream.stream_key_name("#{stream_name(object)}.delay"), payload: JSON.dump(object.redstream_payload) end end end end Redstream.connection_pool.with do |redis| redis.wait(@wait, 0) if @wait end res end |
#bulk_delete(records, ids) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Deletes delay message from a delay stream in redis.
96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/redstream/producer.rb', line 96 def bulk_delete(records, ids) records.each_with_index.each_slice(250) do |slice| Redstream.connection_pool.with do |redis| redis.pipelined do slice.each do |object, index| redis.xdel Redstream.stream_key_name("#{stream_name(object)}.delay"), ids[index] end end end end end |
#bulk_queue(records) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes messages to a stream in redis for immediate retrieval.
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/redstream/producer.rb', line 114 def bulk_queue(records) records.each_slice(250) do |slice| Redstream.connection_pool.with do |redis| redis.pipelined do slice.each do |object| redis.xadd Redstream.stream_key_name(stream_name(object)), payload: JSON.dump(object.redstream_payload) end end end end true end |
#delay(object) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes a single delay message to a delay stream in redis.
136 137 138 139 140 141 142 |
# File 'lib/redstream/producer.rb', line 136 def delay(object) Redstream.connection_pool.with do |redis| res = redis.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), payload: JSON.dump(object.redstream_payload)) redis.wait(@wait, 0) if @wait res end end |
#delete(object, id) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Deletes a single delay message from a delay stream in redis.
151 152 153 154 155 |
# File 'lib/redstream/producer.rb', line 151 def delete(object, id) Redstream.connection_pool.with do |redis| redis.xdel Redstream.stream_key_name("#{stream_name(object)}.delay"), id end end |
#queue(object) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Writes a single message to a stream in redis for immediate retrieval.
163 164 165 166 167 168 169 |
# File 'lib/redstream/producer.rb', line 163 def queue(object) Redstream.connection_pool.with do |redis| redis.xadd Redstream.stream_key_name(stream_name(object)), payload: JSON.dump(object.redstream_payload) end true end |