Module: Protocol::Redis::Cluster::Methods::Streams
- Defined in:
- lib/protocol/redis/cluster/methods/streams.rb
Overview
Provides Redis Streams commands for cluster environments. Stream operations are routed to the appropriate shard based on the stream key.
Instance Method Summary collapse
-
#xack(key, *arguments, role: :master) ⇒ Object
Acknowledge processed messages in a consumer group.
-
#xadd(key, *arguments, role: :master) ⇒ Object
Append a new entry to a stream.
-
#xclaim(key, *arguments, role: :master) ⇒ Object
Change ownership of messages in a consumer group.
-
#xdel(key, *arguments, role: :master) ⇒ Object
Remove specified entries from the stream.
-
#xgroup(*arguments, role: :master) ⇒ Object
Create, destroy, and manage consumer groups.
-
#xinfo(*arguments, role: :master) ⇒ Object
Get information on streams and consumer groups.
-
#xlen(key, role: :master) ⇒ Object
Return the number of entries in a stream.
-
#xpending(key, *arguments, role: :master) ⇒ Object
Get information about pending messages in a consumer group.
-
#xrange(key, *arguments, role: :master) ⇒ Object
Return a range of elements in a stream.
-
#xread(*arguments, role: :master) ⇒ Object
Read new entries from multiple streams.
-
#xreadgroup(*arguments, role: :master) ⇒ Object
Read new entries from streams using a consumer group.
-
#xrevrange(key, *arguments, role: :master) ⇒ Object
Return a range of elements in a stream in reverse order.
-
#xtrim(key, *arguments, role: :master) ⇒ Object
Trim the stream to a certain size.
Instance Method Details
#xack(key, *arguments, role: :master) ⇒ Object
Acknowledge processed messages in a consumer group.
173 174 175 176 177 178 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 173 def xack(key, *arguments, role: :master) slot = slot_for(key) client = client_for(slot, role) return client.call("XACK", key, *arguments) end |
#xadd(key, *arguments, role: :master) ⇒ Object
Append a new entry to a stream.
37 38 39 40 41 42 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 37 def xadd(key, *arguments, role: :master) slot = slot_for(key) client = client_for(slot, role) return client.call("XADD", key, *arguments) end |
#xclaim(key, *arguments, role: :master) ⇒ Object
Change ownership of messages in a consumer group.
185 186 187 188 189 190 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 185 def xclaim(key, *arguments, role: :master) slot = slot_for(key) client = client_for(slot, role) return client.call("XCLAIM", key, *arguments) end |
#xdel(key, *arguments, role: :master) ⇒ Object
Remove specified entries from the stream.
61 62 63 64 65 66 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 61 def xdel(key, *arguments, role: :master) slot = slot_for(key) client = client_for(slot, role) return client.call("XDEL", key, *arguments) end |
#xgroup(*arguments, role: :master) ⇒ Object
Create, destroy, and manage consumer groups.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 128 def xgroup(*arguments, role: :master) # Extract stream key (usually third argument for CREATE, second for others): stream_key = case arguments[0]&.upcase when "CREATE", "SETID" arguments[1] # CREATE stream group id, SETID stream group id when "DESTROY", "DELCONSUMER" arguments[1] # DESTROY stream group, DELCONSUMER stream group consumer else arguments[1] if arguments.length > 1 end if stream_key slot = slot_for(stream_key) client = client_for(slot, role) else client = any_client(role) end return client.call("XGROUP", *arguments) end |
#xinfo(*arguments, role: :master) ⇒ Object
Get information on streams and consumer groups.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 17 def xinfo(*arguments, role: :master) # Extract stream key (usually the second argument after subcommand): stream_key = arguments[1] if arguments.length > 1 if stream_key slot = slot_for(stream_key) client = client_for(slot, role) else # Fallback for commands without a specific key: client = any_client(role) end return client.call("XINFO", *arguments) end |
#xlen(key, role: :master) ⇒ Object
Return the number of entries in a stream.
96 97 98 99 100 101 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 96 def xlen(key, role: :master) slot = slot_for(key) client = client_for(slot, role) return client.call("XLEN", key) end |
#xpending(key, *arguments, role: :master) ⇒ Object
Get information about pending messages in a consumer group.
197 198 199 200 201 202 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 197 def xpending(key, *arguments, role: :master) slot = slot_for(key) client = client_for(slot, role) return client.call("XPENDING", key, *arguments) end |
#xrange(key, *arguments, role: :master) ⇒ Object
Return a range of elements in a stream.
73 74 75 76 77 78 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 73 def xrange(key, *arguments, role: :master) slot = slot_for(key) client = client_for(slot, role) return client.call("XRANGE", key, *arguments) end |
#xread(*arguments, role: :master) ⇒ Object
Read new entries from multiple streams. Note: In cluster mode, all streams in a single XREAD must be on the same shard.
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 108 def xread(*arguments, role: :master) # Extract first stream key to determine shard: streams_index = arguments.index("STREAMS") if streams_index && streams_index + 1 < arguments.length first_stream_key = arguments[streams_index + 1] slot = slot_for(first_stream_key) client = client_for(slot, role) else # Fallback if STREAMS keyword not found: client = any_client(role) end return client.call("XREAD", *arguments) end |
#xreadgroup(*arguments, role: :master) ⇒ Object
Read new entries from streams using a consumer group.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 153 def xreadgroup(*arguments, role: :master) # Extract first stream key to determine shard: streams_index = arguments.index("STREAMS") if streams_index && streams_index + 1 < arguments.length first_stream_key = arguments[streams_index + 1] slot = slot_for(first_stream_key) client = client_for(slot, role) else client = any_client(role) end return client.call("XREADGROUP", *arguments) end |
#xrevrange(key, *arguments, role: :master) ⇒ Object
Return a range of elements in a stream in reverse order.
85 86 87 88 89 90 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 85 def xrevrange(key, *arguments, role: :master) slot = slot_for(key) client = client_for(slot, role) return client.call("XREVRANGE", key, *arguments) end |
#xtrim(key, *arguments, role: :master) ⇒ Object
Trim the stream to a certain size.
49 50 51 52 53 54 |
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 49 def xtrim(key, *arguments, role: :master) slot = slot_for(key) client = client_for(slot, role) return client.call("XTRIM", key, *arguments) end |