Module: Protocol::Redis::Cluster::Methods::Streams

Defined in:
lib/protocol/redis/cluster/methods/streams.rb

Overview

Provides Redis Streams commands for cluster environments. Stream operations are routed to the appropriate shard based on the stream key.

Instance Method Summary collapse

Instance Method Details

#xack(key, *arguments, role: :master) ⇒ Object

Acknowledge processed messages in a consumer group.



173
174
175
176
177
178
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 173

def xack(key, *arguments, role: :master)
  slot = slot_for(key)
  client = client_for(slot, role)
  
  return client.call("XACK", key, *arguments)
end

#xadd(key, *arguments, role: :master) ⇒ Object

Append a new entry to a stream.



37
38
39
40
41
42
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 37

def xadd(key, *arguments, role: :master)
  slot = slot_for(key)
  client = client_for(slot, role)
  
  return client.call("XADD", key, *arguments)
end

#xclaim(key, *arguments, role: :master) ⇒ Object

Change ownership of messages in a consumer group.



185
186
187
188
189
190
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 185

def xclaim(key, *arguments, role: :master)
  slot = slot_for(key)
  client = client_for(slot, role)
  
  return client.call("XCLAIM", key, *arguments)
end

#xdel(key, *arguments, role: :master) ⇒ Object

Remove specified entries from the stream.



61
62
63
64
65
66
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 61

def xdel(key, *arguments, role: :master)
  slot = slot_for(key)
  client = client_for(slot, role)
  
  return client.call("XDEL", key, *arguments)
end

#xgroup(*arguments, role: :master) ⇒ Object

Create, destroy, and manage consumer groups.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 128

def xgroup(*arguments, role: :master)
  # Extract stream key (usually third argument for CREATE, second for others):
  stream_key = case arguments[0]&.upcase
                            when "CREATE", "SETID"
                              arguments[1] # CREATE stream group id, SETID stream group id
                            when "DESTROY", "DELCONSUMER"
                              arguments[1] # DESTROY stream group, DELCONSUMER stream group consumer
                            else
                              arguments[1] if arguments.length > 1
  end
  
  if stream_key
    slot = slot_for(stream_key)
    client = client_for(slot, role)
  else
    client = any_client(role)
  end
  
  return client.call("XGROUP", *arguments)
end

#xinfo(*arguments, role: :master) ⇒ Object

Get information on streams and consumer groups.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 17

def xinfo(*arguments, role: :master)
  # Extract stream key (usually the second argument after subcommand):
  stream_key = arguments[1] if arguments.length > 1
  
  if stream_key
    slot = slot_for(stream_key)
    client = client_for(slot, role)
  else
    # Fallback for commands without a specific key:
    client = any_client(role)
  end
  
  return client.call("XINFO", *arguments)
end

#xlen(key, role: :master) ⇒ Object

Return the number of entries in a stream.



96
97
98
99
100
101
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 96

def xlen(key, role: :master)
  slot = slot_for(key)
  client = client_for(slot, role)
  
  return client.call("XLEN", key)
end

#xpending(key, *arguments, role: :master) ⇒ Object

Get information about pending messages in a consumer group.



197
198
199
200
201
202
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 197

def xpending(key, *arguments, role: :master)
  slot = slot_for(key)
  client = client_for(slot, role)
  
  return client.call("XPENDING", key, *arguments)
end

#xrange(key, *arguments, role: :master) ⇒ Object

Return a range of elements in a stream.



73
74
75
76
77
78
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 73

def xrange(key, *arguments, role: :master)
  slot = slot_for(key)
  client = client_for(slot, role)
  
  return client.call("XRANGE", key, *arguments)
end

#xread(*arguments, role: :master) ⇒ Object

Read new entries from multiple streams. Note: In cluster mode, all streams in a single XREAD must be on the same shard.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 108

def xread(*arguments, role: :master)
  # Extract first stream key to determine shard:
  streams_index = arguments.index("STREAMS")
  
  if streams_index && streams_index + 1 < arguments.length
    first_stream_key = arguments[streams_index + 1]
    slot = slot_for(first_stream_key)
    client = client_for(slot, role)
  else
    # Fallback if STREAMS keyword not found:
    client = any_client(role)
  end
  
  return client.call("XREAD", *arguments)
end

#xreadgroup(*arguments, role: :master) ⇒ Object

Read new entries from streams using a consumer group.



153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 153

def xreadgroup(*arguments, role: :master)
  # Extract first stream key to determine shard:
  streams_index = arguments.index("STREAMS")
  
  if streams_index && streams_index + 1 < arguments.length
    first_stream_key = arguments[streams_index + 1]
    slot = slot_for(first_stream_key)
    client = client_for(slot, role)
  else
    client = any_client(role)
  end
  
  return client.call("XREADGROUP", *arguments)
end

#xrevrange(key, *arguments, role: :master) ⇒ Object

Return a range of elements in a stream in reverse order.



85
86
87
88
89
90
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 85

def xrevrange(key, *arguments, role: :master)
  slot = slot_for(key)
  client = client_for(slot, role)
  
  return client.call("XREVRANGE", key, *arguments)
end

#xtrim(key, *arguments, role: :master) ⇒ Object

Trim the stream to a certain size.



49
50
51
52
53
54
# File 'lib/protocol/redis/cluster/methods/streams.rb', line 49

def xtrim(key, *arguments, role: :master)
  slot = slot_for(key)
  client = client_for(slot, role)
  
  return client.call("XTRIM", key, *arguments)
end