Class: Redis::Stream::Client
- Inherits:
-
Object
- Object
- Redis::Stream::Client
- Includes:
- Inspect
- Defined in:
- lib/redis/stream/client.rb
Instance Attribute Summary collapse
-
#cache ⇒ Object
readonly
Returns the value of attribute cache.
-
#consumer_id ⇒ Object
readonly
Returns the value of attribute consumer_id.
-
#group ⇒ Object
readonly
Returns the value of attribute group.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#non_blocking ⇒ Object
readonly
Returns the value of attribute non_blocking.
-
#redis_pool ⇒ Object
readonly
Returns the value of attribute redis_pool.
-
#stream ⇒ Object
readonly
Returns the value of attribute stream.
Instance Method Summary collapse
-
#add(data = {}, options = {}) ⇒ Object
add: add a message to the stream no passthrough variable here.
-
#initialize(stream_name, group_name = nil, name = rand(36 ** 7).to_s(36), options = {}) ⇒ Client
constructor
Initialize: setup rstream Example: Redis::Stream::Client.new(“resolver”, “stream”, => Logger.new(STDOUT)) if group is nil or not supplied then no rstream group will be setup.
-
#on_message(&block) ⇒ Object
on_message: execute this block everytime a new message is received.
-
#running? ⇒ Boolean
running?: Are we still in the running state.
-
#sanitize ⇒ Object
remove dead and non existing consumers and groups.
-
#start(block = true, passthrough = false) ⇒ Object
start: start listening for stream messages.
-
#stop ⇒ Object
stop: stop listening for new messages.
-
#sync_add(data = {}, options = {}) ⇒ Object
sync_add: same as add command but synchronous.
- #trace(operation_name, parent_scope = nil) ⇒ Object
Methods included from Inspect
#consumers, #del_consumer, #del_group, #groups, #info, #pending_messages
Constructor Details
#initialize(stream_name, group_name = nil, name = rand(36 ** 7).to_s(36), options = {}) ⇒ Client
Initialize: setup rstream Example: Redis::Stream::Client.new(“resolver”, “stream”, => Logger.new(STDOUT)) if group is nil or not supplied then no rstream group will be setup
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/redis/stream/client.rb', line 27 def initialize(stream_name, group_name = nil, name = rand(36 ** 7).to_s(36), = {}) = {"host" => "127.0.0.1", "port" => 6379, "db" => 0, "config_file_path" => '.', "logger" => Logger.new(STDOUT), "tracer" => nil } = .merge() Redis::Stream::Config.path = ['config_file_path'] host = ["host"] port = ["port"] db = ["db"] @logger = ["logger"] @cache = .include?('caching') && ['caching'] ? Redis::Stream::DataCache.new : nil @name = name @state = Redis::Stream::State::IDLE @stream = stream_name @group = group_name if .include?('redis') @logger.info("Taking REDIS as a parameter") @redis_pool = ConnectionPool.new(size: 10, timeout: 5) { ['redis'] } elsif Redis::Stream::Config.file_exists? && Redis::Stream::Config.include?(:redis) @logger.info("Taking REDIS from config file") @redis_pool = ConnectionPool.new(size: 10, timeout: 5) { Redis.new(Redis::Stream::Config[:redis]) } else @logger.info("Instantiating REDIS") @redis_pool = ConnectionPool.new(size: 10, timeout: 5) { Redis.new(host: host, port: port, db: db) } end @consumer_id = "#{@name}-#{@group}-#{Process.pid}" @non_blocking = nil # @send_queue = [] raise "No redis" if @redis_pool.nil? || @redis_pool.available == 0 if .has_key?('tracer') && !['tracer'].nil? OpenTracing.global_tracer = ["tracer"] elsif Redis::Stream::Config.include?(:zipkin) OpenTracing.global_tracer = Zipkin::Tracer.build(url: Redis::Stream::Config[:zipkin], service_name: "#{@name}-#{@group}", flush_interval: 1) end @state = Redis::Stream::State::RUNNING if .include?("sync_start") && ["sync_start"] setup_stream @last_id = info['last-generated-id'] rescue '0' @logger.info "#{@consumer_id} - Last ID = #{@last_id}" end |
Instance Attribute Details
#cache ⇒ Object (readonly)
Returns the value of attribute cache.
17 18 19 |
# File 'lib/redis/stream/client.rb', line 17 def cache @cache end |
#consumer_id ⇒ Object (readonly)
Returns the value of attribute consumer_id.
17 18 19 |
# File 'lib/redis/stream/client.rb', line 17 def consumer_id @consumer_id end |
#group ⇒ Object (readonly)
Returns the value of attribute group.
17 18 19 |
# File 'lib/redis/stream/client.rb', line 17 def group @group end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
17 18 19 |
# File 'lib/redis/stream/client.rb', line 17 def logger @logger end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
17 18 19 |
# File 'lib/redis/stream/client.rb', line 17 def name @name end |
#non_blocking ⇒ Object (readonly)
Returns the value of attribute non_blocking.
17 18 19 |
# File 'lib/redis/stream/client.rb', line 17 def non_blocking @non_blocking end |
#redis_pool ⇒ Object (readonly)
Returns the value of attribute redis_pool.
17 18 19 |
# File 'lib/redis/stream/client.rb', line 17 def redis_pool @redis_pool end |
#stream ⇒ Object (readonly)
Returns the value of attribute stream.
17 18 19 |
# File 'lib/redis/stream/client.rb', line 17 def stream @stream end |
Instance Method Details
#add(data = {}, options = {}) ⇒ Object
add: add a message to the stream no passthrough variable here. The passthrough is available in the start method
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/redis/stream/client.rb', line 82 def add(data = {}, = {}) raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING) add_id = nil OpenTracing.start_active_span('add') do |scope| = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "cache_key" => nil, "tracer" => nil} = .merge() type = ["type"] to = ["to"] group = ["group"] payload = build_payload(data, ) add_id = nil @redis_pool.with do |redis| add_id = redis.xadd(@stream, payload) end @logger.info("#{@consumer_id} - send to '#{to}' in group '#{group}' with id '#{add_id}' of type '#{type}'") end add_id end |
#on_message(&block) ⇒ Object
on_message: execute this block everytime a new message is received
137 138 139 |
# File 'lib/redis/stream/client.rb', line 137 def (&block) = block end |
#running? ⇒ Boolean
running?: Are we still in the running state
174 175 176 177 |
# File 'lib/redis/stream/client.rb', line 174 def running? t = @non_blocking.nil? ? true : @non_blocking.alive? t && @state.eql?(Redis::Stream::State::RUNNING) end |
#sanitize ⇒ Object
remove dead and non existing consumers and groups
180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/redis/stream/client.rb', line 180 def sanitize groups.each do |group| consumers(group["name"]).each do |consumer| if @consumer_id != consumer["name"] result = sync_add({}, "to" => consumer["name"], "group" => group["name"], "type" => Redis::Stream::Type::PING, "time_out" => 1) if result.nil? del_consumer(group['name'], consumer['name']) end end end end end |
#start(block = true, passthrough = false) ⇒ Object
start: start listening for stream messages
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/redis/stream/client.rb', line 145 def start(block = true, passthrough = false) raise "#{@consumer_id} already running" if @state == Redis::Stream::State::RUNNING @state = Redis::Stream::State::RUNNING #sanitize if block while @state == Redis::Stream::State::RUNNING (true, passthrough) end else @non_blocking = Thread.new do while @state == Redis::Stream::State::RUNNING (true, passthrough) end @logger.info("#{@consumer_id} - ending thread") end end end |
#stop ⇒ Object
stop: stop listening for new messages
164 165 166 167 168 169 170 171 |
# File 'lib/redis/stream/client.rb', line 164 def stop @state = Redis::Stream::State::STOPPED @logger.info("#{@consumer_id} - stopping") @non_blocking.join unless @non_blocking.nil? ensure del_consumer del_group end |
#sync_add(data = {}, options = {}) ⇒ Object
sync_add: same as add command but synchronous. Blocks call until a message arrives
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/redis/stream/client.rb', line 106 def sync_add(data = {}, = {}) raise "Client isn't running" unless @state.eql?(Redis::Stream::State::RUNNING) = {"to" => "*", "group" => "*", "type" => Redis::Stream::Type::ACTION, "time_out" => 5, "passthrough" => false, "cache_key" => nil, "tracer" => nil} = .merge() to = ["to"] group = ["group"] passthrough = ["passthrough"] time_out = ["time_out"] #@state = Redis::Stream::State::RUNNING data_out = nil add_id = add(data, "to" => to, "group" => group, "type" => ["type"], "cache_key" => ["cache_key"], "tracer" => ['tracer']) time = Time.now loop do timing = ((Time.now - time)).to_i if timing > time_out @logger.info("#{@consumer_id} - Time out(#{time_out}) for '#{to}' in group '#{group}'") #@send_queue.delete(add_id) if @send_queue.include?(add_id) break end break if (data_out = (false, passthrough)) end #@state = Redis::Stream::State::STOPPED data_out end |
#trace(operation_name, parent_scope = nil) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/redis/stream/client.rb', line 193 def trace(operation_name, parent_scope = nil) # span = OpenTracing.start_span(operation_name) # yield span if block_given? # span.finish # active_scope = OpenTracing.scope_manager.active if parent_scope.is_a?(Zipkin::Scope) OpenTracing.start_active_span(operation_name, child_of: parent_scope.span) do |scope| yield scope if block_given? end else OpenTracing.start_active_span(operation_name) do |scope| yield scope if block_given? end end end |