Class: Redis::Stream::Wrapper

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/stream/wrapper.rb,
lib/redis/stream/wrapper/message.rb,
lib/redis/stream/wrapper/version.rb,
lib/redis/stream/wrapper/exceptions.rb

Defined Under Namespace

Classes: Message, StreamReadError

Constant Summary collapse

VERSION =
"0.1.5"

Instance Method Summary collapse

Constructor Details

#initialize(redis, read_timeout_ms = 1000) ⇒ Wrapper

Creates a new instance if a Stream.



12
13
14
15
16
# File 'lib/redis/stream/wrapper.rb', line 12

def initialize(redis, read_timeout_ms = 1000)
  @redis = redis
  @listening = false
  @read_timeout_ms = read_timeout_ms
end

Instance Method Details

#ack_message(group, message) ⇒ Object

ACK stream message.



79
80
81
# File 'lib/redis/stream/wrapper.rb', line 79

def ack_message(group, message)
  @redis.xack(message.stream, group, message.id)
end

#add_message(message) ⇒ Object

Adds a new message to the stream.



31
32
33
# File 'lib/redis/stream/wrapper.rb', line 31

def add_message(message)
  copy_message(message, @redis.xadd(message.stream, message.payload, id: message.id))
end

#clear_stream!(stream_name) ⇒ Object

Deletes the stream.



22
23
24
# File 'lib/redis/stream/wrapper.rb', line 22

def clear_stream!(stream_name)
  @redis.del(stream_name)
end

#create_group(name, stream, start = '$', create_default_stream = true) ⇒ Object

Create group stream message.



98
99
100
# File 'lib/redis/stream/wrapper.rb', line 98

def create_group(name, stream, start = '$', create_default_stream = true)
  @redis.xgroup(:create, stream, name, start, mkstream: create_default_stream)
end

#delete_group(name, stream) ⇒ Object

Delete group stream message.



117
118
119
# File 'lib/redis/stream/wrapper.rb', line 117

def delete_group(name, stream)
  @redis.xgroup(:destroy, stream, name)
end

#delete_group_consumer(name, stream, consumer) ⇒ Object

Delete group stream message.



127
128
129
# File 'lib/redis/stream/wrapper.rb', line 127

def delete_group_consumer(name, stream, consumer)
  @redis.xgroup(:delconsumer, stream, name, consumer)
end

#delete_message(message) ⇒ Object

Delete stream message.



87
88
89
# File 'lib/redis/stream/wrapper.rb', line 87

def delete_message(message)
  @redis.xdel(message.stream, message.id)
end

#info(type, key, group = nil) ⇒ Object

Get info about streams / groups and consumers.



108
109
110
# File 'lib/redis/stream/wrapper.rb', line 108

def info(type, key, group = nil)
  @redis.xinfo(type, key, group)
end

#listen(group, consumer_name, streams, opts = {}) ⇒ Object

Starts reading stream messages looping

Raises:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/redis/stream/wrapper.rb', line 42

def listen(group, consumer_name, streams, opts = {})
  raise StreamReadError, "Already listening [#{stream}] stream" if @listening

  @listening = true
  opts[:block] = @read_timeout_ms if opts[:block].nil?
  while @listening
    results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts)
    next unless results

    parse_read_response(results).each do |message|
      yield message
    end
  end
end

#read(group, consumer_name, streams, opts = {}) ⇒ Object

Starts reading stream messages.



64
65
66
67
68
69
70
71
72
# File 'lib/redis/stream/wrapper.rb', line 64

def read(group, consumer_name, streams, opts = {})
  opts[:block] = @read_timeout_ms if opts[:block].nil?
  results = @redis.xreadgroup(group, consumer_name, streams.keys, streams.values, opts)
  return unless results

  parse_read_response(results).each.map do |message|
    message
  end
end

#stop_listeningObject

Stops reading message stream(s)



133
134
135
# File 'lib/redis/stream/wrapper.rb', line 133

def stop_listening
  @listening = false
end