Class: KBS::Blackboard::RedisMessageQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/kbs/blackboard/redis_message_queue.rb

Overview

Redis-based message queue using sorted sets for priority ordering

Instance Method Summary collapse

Constructor Details

#initialize(redis) ⇒ RedisMessageQueue



10
11
12
13
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 10

def initialize(redis)
  @redis = redis
  @message_id_counter = "message_id_counter"
end

Instance Method Details

#consume(topic, consumer) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 41

def consume(topic, consumer)
  # Get highest priority (lowest score) unconsumed message
  messages = @redis.zrange("messages:#{topic}", 0, 0)
  return nil if messages.empty?

  message_id = messages.first
  message_data = @redis.hgetall("message:#{message_id}")

  return nil if message_data.empty? || message_data['consumed'] == '1'

  # Mark as consumed
  @redis.multi do |pipeline|
    pipeline.hset("message:#{message_id}", 'consumed', '1')
    pipeline.hset("message:#{message_id}", 'consumed_by', consumer)
    pipeline.hset("message:#{message_id}", 'consumed_at', Time.now.to_f)
    pipeline.zrem("messages:#{topic}", message_id)
  end

  {
    id: message_data['id'].to_i,
    sender: message_data['sender'],
    topic: message_data['topic'],
    content: JSON.parse(message_data['content'], symbolize_names: true),
    priority: message_data['priority'].to_i,
    posted_at: Time.at(message_data['posted_at'].to_f)
  }
end

#peek(topic, limit: 10) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 69

def peek(topic, limit: 10)
  # Get top N messages without consuming
  message_ids = @redis.zrange("messages:#{topic}", 0, limit - 1)
  messages = []

  message_ids.each do |message_id|
    message_data = @redis.hgetall("message:#{message_id}")
    next if message_data.empty? || message_data['consumed'] == '1'

    messages << {
      id: message_data['id'].to_i,
      sender: message_data['sender'],
      topic: message_data['topic'],
      content: JSON.parse(message_data['content'], symbolize_names: true),
      priority: message_data['priority'].to_i,
      posted_at: Time.at(message_data['posted_at'].to_f)
    }
  end

  messages
end

#post(sender, topic, content, priority: 0) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 15

def post(sender, topic, content, priority: 0)
  message_id = @redis.incr(@message_id_counter)
  content_json = content.is_a?(String) ? content : JSON.generate(content)
  timestamp = Time.now.to_f

  message_data = {
    'id' => message_id,
    'sender' => sender,
    'topic' => topic,
    'content' => content_json,
    'priority' => priority,
    'posted_at' => timestamp,
    'consumed' => '0'
  }

  # Store message data
  @redis.hset("message:#{message_id}", message_data)

  # Add to topic queue with score = -priority (for DESC ordering) + timestamp (for ASC within priority)
  # Score: higher priority = lower score (negative), then by timestamp
  score = (-priority * 1_000_000) + timestamp
  @redis.zadd("messages:#{topic}", score, message_id)

  message_id
end

#statsObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 91

def stats
  # Count all unconsumed messages across all topics
  topics = @redis.keys('messages:*')
  total_unconsumed = 0

  topics.each do |topic_key|
    total_unconsumed += @redis.zcard(topic_key)
  end

  # Count all messages (including consumed)
  all_message_keys = @redis.keys('message:*')
  total_messages = all_message_keys.size

  {
    total_messages: total_messages,
    unconsumed_messages: total_unconsumed
  }
end