Class: RubyReactor::Storage::RedisAdapter
- Defined in:
- lib/ruby_reactor/storage/redis_adapter.rb
Instance Method Summary collapse
- #decrement_map_counter(map_id, reactor_class_name) ⇒ Object
- #expire(key, seconds) ⇒ Object
- #increment_last_queued_index(map_id, reactor_class_name) ⇒ Object
- #increment_map_counter(map_id, reactor_class_name) ⇒ Object
-
#initialize(redis_config) ⇒ RedisAdapter
constructor
A new instance of RedisAdapter.
- #initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true) ⇒ Object
- #publish(channel, message) ⇒ Object
- #retrieve_context(context_id, reactor_class_name) ⇒ Object
- #retrieve_map_metadata(map_id, reactor_class_name) ⇒ Object
- #retrieve_map_results(map_id, reactor_class_name, strict_ordering: true) ⇒ Object
- #set_last_queued_index(map_id, index, reactor_class_name) ⇒ Object
- #set_map_counter(map_id, count, reactor_class_name) ⇒ Object
- #store_context(context_id, serialized_context, reactor_class_name) ⇒ Object
- #store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true) ⇒ Object
- #subscribe(channel, &block) ⇒ Object
Constructor Details
#initialize(redis_config) ⇒ RedisAdapter
Returns a new instance of RedisAdapter.
9 10 11 12 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 9 def initialize(redis_config) super() @redis = Redis.new(redis_config) end |
Instance Method Details
#decrement_map_counter(map_id, reactor_class_name) ⇒ Object
93 94 95 96 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 93 def decrement_map_counter(map_id, reactor_class_name) key = map_counter_key(map_id, reactor_class_name) @redis.decr(key) end |
#expire(key, seconds) ⇒ Object
117 118 119 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 117 def expire(key, seconds) @redis.expire(key, seconds) end |
#increment_last_queued_index(map_id, reactor_class_name) ⇒ Object
104 105 106 107 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 104 def increment_last_queued_index(map_id, reactor_class_name) key = map_last_queued_index_key(map_id, reactor_class_name) @redis.incr(key) end |
#increment_map_counter(map_id, reactor_class_name) ⇒ Object
87 88 89 90 91 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 87 def increment_map_counter(map_id, reactor_class_name) key = map_counter_key(map_id, reactor_class_name) @redis.incr(key) @redis.expire(key, 86_400) end |
#initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 63 def initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true) # Ensure counter is set set_map_counter(map_id, count, parent_reactor_class_name) # Store metadata key = "reactor:#{parent_reactor_class_name}:map:#{map_id}:metadata" = { count: count, strict_ordering: strict_ordering, reactor_class_info: reactor_class_info, created_at: Time.now.to_i } @redis.call("JSON.SET", key, ".", .to_json) @redis.expire(key, 86_400) end |
#publish(channel, message) ⇒ Object
113 114 115 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 113 def publish(channel, ) @redis.publish(channel, ) end |
#retrieve_context(context_id, reactor_class_name) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 21 def retrieve_context(context_id, reactor_class_name) key = context_key(context_id, reactor_class_name) json = @redis.call("JSON.GET", key) return nil unless json JSON.parse(json) end |
#retrieve_map_metadata(map_id, reactor_class_name) ⇒ Object
79 80 81 82 83 84 85 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 79 def (map_id, reactor_class_name) key = "reactor:#{reactor_class_name}:map:#{map_id}:metadata" json = @redis.call("JSON.GET", key) return nil unless json JSON.parse(json) end |
#retrieve_map_results(map_id, reactor_class_name, strict_ordering: true) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 44 def retrieve_map_results(map_id, reactor_class_name, strict_ordering: true) key = map_results_key(map_id, reactor_class_name) if strict_ordering results = @redis.hgetall(key) # Sort by index (key) results.keys.sort_by(&:to_i).map { |k| JSON.parse(results[k]) } else results = @redis.lrange(key, 0, -1) results.map { |r| JSON.parse(r) } end end |
#set_last_queued_index(map_id, index, reactor_class_name) ⇒ Object
98 99 100 101 102 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 98 def set_last_queued_index(map_id, index, reactor_class_name) key = map_last_queued_index_key(map_id, reactor_class_name) @redis.set(key, index) @redis.expire(key, 86_400) end |
#set_map_counter(map_id, count, reactor_class_name) ⇒ Object
57 58 59 60 61 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 57 def set_map_counter(map_id, count, reactor_class_name) key = map_counter_key(map_id, reactor_class_name) @redis.set(key, count) @redis.expire(key, 86_400) end |
#store_context(context_id, serialized_context, reactor_class_name) ⇒ Object
14 15 16 17 18 19 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 14 def store_context(context_id, serialized_context, reactor_class_name) key = context_key(context_id, reactor_class_name) # Use JSON.SET for efficient storage and retrieval @redis.call("JSON.SET", key, ".", serialized_context) @redis.expire(key, 86_400) # 24h TTL end |
#store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 29 def store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true) key = map_results_key(map_id, reactor_class_name) if strict_ordering # Use Hash for strict ordering by index # HSET key index serialized_result @redis.hset(key, index.to_s, serialized_result.to_json) else # Loose ordering: just push to list @redis.rpush(key, serialized_result.to_json) end @redis.expire(key, 86_400) end |
#subscribe(channel, &block) ⇒ Object
109 110 111 |
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 109 def subscribe(channel, &block) @redis.subscribe(channel, &block) end |