Class: RubyReactor::Storage::RedisAdapter

Inherits:
Adapter
  • Object
show all
Defined in:
lib/ruby_reactor/storage/redis_adapter.rb

Instance Method Summary collapse

Constructor Details

#initialize(redis_config) ⇒ RedisAdapter

Returns a new instance of RedisAdapter.



9
10
11
12
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 9

def initialize(redis_config)
  super()
  @redis = Redis.new(redis_config)
end

Instance Method Details

#decrement_map_counter(map_id, reactor_class_name) ⇒ Object



93
94
95
96
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 93

def decrement_map_counter(map_id, reactor_class_name)
  key = map_counter_key(map_id, reactor_class_name)
  @redis.decr(key)
end

#expire(key, seconds) ⇒ Object



117
118
119
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 117

def expire(key, seconds)
  @redis.expire(key, seconds)
end

#increment_last_queued_index(map_id, reactor_class_name) ⇒ Object



104
105
106
107
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 104

def increment_last_queued_index(map_id, reactor_class_name)
  key = map_last_queued_index_key(map_id, reactor_class_name)
  @redis.incr(key)
end

#increment_map_counter(map_id, reactor_class_name) ⇒ Object



87
88
89
90
91
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 87

def increment_map_counter(map_id, reactor_class_name)
  key = map_counter_key(map_id, reactor_class_name)
  @redis.incr(key)
  @redis.expire(key, 86_400)
end

#initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 63

def initialize_map_operation(map_id, count, parent_reactor_class_name, reactor_class_info:, strict_ordering: true)
  # Ensure counter is set
  set_map_counter(map_id, count, parent_reactor_class_name)

  # Store metadata
  key = "reactor:#{parent_reactor_class_name}:map:#{map_id}:metadata"
   = {
    count: count,
    strict_ordering: strict_ordering,
    reactor_class_info: reactor_class_info,
    created_at: Time.now.to_i
  }
  @redis.call("JSON.SET", key, ".", .to_json)
  @redis.expire(key, 86_400)
end

#publish(channel, message) ⇒ Object



113
114
115
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 113

def publish(channel, message)
  @redis.publish(channel, message)
end

#retrieve_context(context_id, reactor_class_name) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 21

def retrieve_context(context_id, reactor_class_name)
  key = context_key(context_id, reactor_class_name)
  json = @redis.call("JSON.GET", key)
  return nil unless json

  JSON.parse(json)
end

#retrieve_map_metadata(map_id, reactor_class_name) ⇒ Object



79
80
81
82
83
84
85
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 79

def (map_id, reactor_class_name)
  key = "reactor:#{reactor_class_name}:map:#{map_id}:metadata"
  json = @redis.call("JSON.GET", key)
  return nil unless json

  JSON.parse(json)
end

#retrieve_map_results(map_id, reactor_class_name, strict_ordering: true) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 44

def retrieve_map_results(map_id, reactor_class_name, strict_ordering: true)
  key = map_results_key(map_id, reactor_class_name)

  if strict_ordering
    results = @redis.hgetall(key)
    # Sort by index (key)
    results.keys.sort_by(&:to_i).map { |k| JSON.parse(results[k]) }
  else
    results = @redis.lrange(key, 0, -1)
    results.map { |r| JSON.parse(r) }
  end
end

#set_last_queued_index(map_id, index, reactor_class_name) ⇒ Object



98
99
100
101
102
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 98

def set_last_queued_index(map_id, index, reactor_class_name)
  key = map_last_queued_index_key(map_id, reactor_class_name)
  @redis.set(key, index)
  @redis.expire(key, 86_400)
end

#set_map_counter(map_id, count, reactor_class_name) ⇒ Object



57
58
59
60
61
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 57

def set_map_counter(map_id, count, reactor_class_name)
  key = map_counter_key(map_id, reactor_class_name)
  @redis.set(key, count)
  @redis.expire(key, 86_400)
end

#store_context(context_id, serialized_context, reactor_class_name) ⇒ Object



14
15
16
17
18
19
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 14

def store_context(context_id, serialized_context, reactor_class_name)
  key = context_key(context_id, reactor_class_name)
  # Use JSON.SET for efficient storage and retrieval
  @redis.call("JSON.SET", key, ".", serialized_context)
  @redis.expire(key, 86_400) # 24h TTL
end

#store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 29

def store_map_result(map_id, index, serialized_result, reactor_class_name, strict_ordering: true)
  key = map_results_key(map_id, reactor_class_name)

  if strict_ordering
    # Use Hash for strict ordering by index
    # HSET key index serialized_result
    @redis.hset(key, index.to_s, serialized_result.to_json)
  else
    # Loose ordering: just push to list
    @redis.rpush(key, serialized_result.to_json)
  end

  @redis.expire(key, 86_400)
end

#subscribe(channel, &block) ⇒ Object



109
110
111
# File 'lib/ruby_reactor/storage/redis_adapter.rb', line 109

def subscribe(channel, &block)
  @redis.subscribe(channel, &block)
end