Class: Redis::Stream::Wrapper
- Inherits:
-
Object
- Object
- Redis::Stream::Wrapper
- Defined in:
- lib/redis/stream/wrapper.rb,
lib/redis/stream/wrapper/message.rb,
lib/redis/stream/wrapper/version.rb,
lib/redis/stream/wrapper/exceptions.rb
Defined Under Namespace
Classes: Message, StreamReadError
Constant Summary collapse
- VERSION =
"0.1.6"
Instance Method Summary collapse
-
#ack_message(group, message) ⇒ Object
ACK stream message.
-
#add_message(message) ⇒ Object
Adds a new message to the stream.
-
#clear_stream!(stream_name) ⇒ Object
Deletes the stream.
-
#create_group(name, stream, start = '$', create_default_stream = true) ⇒ Object
Create group stream message.
-
#delete_group(name, stream) ⇒ Object
Delete group stream message.
-
#delete_group_consumer(name, stream, consumer) ⇒ Object
Delete group stream message.
-
#delete_message(message) ⇒ Object
Delete stream message.
-
#info(type, key, group = nil) ⇒ Object
Get info about streams / groups and consumers.
-
#initialize(redis, read_timeout_ms = 1000) ⇒ Wrapper
constructor
Creates a new instance if a Stream.
-
#listen(group, consumer_name, streams, opts = {}) ⇒ Object
Starts reading stream messages looping.
-
#read(group, consumer_name, streams, opts = {}) ⇒ Object
Starts reading stream messages.
-
#stop_listening ⇒ Object
Stops reading message stream(s).
Constructor Details
#initialize(redis, read_timeout_ms = 1000) ⇒ Wrapper
Creates a new instance if a Stream.
12 13 14 15 16 |
# File 'lib/redis/stream/wrapper.rb', line 12 def initialize(redis, read_timeout_ms = 1000) @redis = redis @listening = false @read_timeout_ms = read_timeout_ms end |
Instance Method Details
#ack_message(group, message) ⇒ Object
ACK stream message.
79 80 81 |
# File 'lib/redis/stream/wrapper.rb', line 79 def (group, ) @redis.xack(.stream, group, .id) end |
#add_message(message) ⇒ Object
Adds a new message to the stream.
31 32 33 |
# File 'lib/redis/stream/wrapper.rb', line 31 def () (, @redis.xadd(.stream, .payload, id: .id)) end |
#clear_stream!(stream_name) ⇒ Object
Deletes the stream.
22 23 24 |
# File 'lib/redis/stream/wrapper.rb', line 22 def clear_stream!(stream_name) @redis.del(stream_name) end |
#create_group(name, stream, start = '$', create_default_stream = true) ⇒ Object
Create group stream message.
98 99 100 |
# File 'lib/redis/stream/wrapper.rb', line 98 def create_group(name, stream, start = '$', create_default_stream = true) @redis.xgroup(:create, stream, name, start, mkstream: create_default_stream) end |
#delete_group(name, stream) ⇒ Object
Delete group stream message.
117 118 119 |
# File 'lib/redis/stream/wrapper.rb', line 117 def delete_group(name, stream) @redis.xgroup(:destroy, stream, name) end |
#delete_group_consumer(name, stream, consumer) ⇒ Object
Delete group stream message.
127 128 129 |
# File 'lib/redis/stream/wrapper.rb', line 127 def delete_group_consumer(name, stream, consumer) @redis.xgroup(:delconsumer, stream, name, consumer) end |
#delete_message(message) ⇒ Object
Delete stream message.
87 88 89 |
# File 'lib/redis/stream/wrapper.rb', line 87 def () @redis.xdel(.stream, .id) end |
#info(type, key, group = nil) ⇒ Object
Get info about streams / groups and consumers.
108 109 110 |
# File 'lib/redis/stream/wrapper.rb', line 108 def info(type, key, group = nil) @redis.xinfo(type, key, group) end |
#listen(group, consumer_name, streams, opts = {}) ⇒ Object
Starts reading stream messages looping
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/redis/stream/wrapper.rb', line 42 def listen(group, consumer_name, streams, opts = {}) raise StreamReadError, "Already listening [#{stream}] stream" if @listening @listening = true opts[:block] = @read_timeout_ms if opts[:block].nil? while @listening results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts) next unless results parse_read_response(results).each do || yield end end end |
#read(group, consumer_name, streams, opts = {}) ⇒ Object
Starts reading stream messages.
64 65 66 67 68 69 70 71 72 |
# File 'lib/redis/stream/wrapper.rb', line 64 def read(group, consumer_name, streams, opts = {}) opts[:block] = @read_timeout_ms if opts[:block].nil? results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts) return unless results parse_read_response(results).each.map do || end end |
#stop_listening ⇒ Object
Stops reading message stream(s)
133 134 135 |
# File 'lib/redis/stream/wrapper.rb', line 133 def stop_listening @listening = false end |