Class: KBS::Blackboard::RedisMessageQueue
- Inherits:
-
Object
- Object
- KBS::Blackboard::RedisMessageQueue
- Defined in:
- lib/kbs/blackboard/redis_message_queue.rb
Overview
Redis-based message queue using sorted sets for priority ordering
Instance Method Summary collapse
- #consume(topic, consumer) ⇒ Object
-
#initialize(redis) ⇒ RedisMessageQueue
constructor
A new instance of RedisMessageQueue.
- #peek(topic, limit: 10) ⇒ Object
- #post(sender, topic, content, priority: 0) ⇒ Object
- #stats ⇒ Object
Constructor Details
#initialize(redis) ⇒ RedisMessageQueue
10 11 12 13 |
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 10 def initialize(redis) @redis = redis = "message_id_counter" end |
Instance Method Details
#consume(topic, consumer) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 41 def consume(topic, consumer) # Get highest priority (lowest score) unconsumed message = @redis.zrange("messages:#{topic}", 0, 0) return nil if .empty? = .first = @redis.hgetall("message:#{message_id}") return nil if .empty? || ['consumed'] == '1' # Mark as consumed @redis.multi do |pipeline| pipeline.hset("message:#{message_id}", 'consumed', '1') pipeline.hset("message:#{message_id}", 'consumed_by', consumer) pipeline.hset("message:#{message_id}", 'consumed_at', Time.now.to_f) pipeline.zrem("messages:#{topic}", ) end { id: ['id'].to_i, sender: ['sender'], topic: ['topic'], content: JSON.parse(['content'], symbolize_names: true), priority: ['priority'].to_i, posted_at: Time.at(['posted_at'].to_f) } end |
#peek(topic, limit: 10) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 69 def peek(topic, limit: 10) # Get top N messages without consuming = @redis.zrange("messages:#{topic}", 0, limit - 1) = [] .each do || = @redis.hgetall("message:#{message_id}") next if .empty? || ['consumed'] == '1' << { id: ['id'].to_i, sender: ['sender'], topic: ['topic'], content: JSON.parse(['content'], symbolize_names: true), priority: ['priority'].to_i, posted_at: Time.at(['posted_at'].to_f) } end end |
#post(sender, topic, content, priority: 0) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 15 def post(sender, topic, content, priority: 0) = @redis.incr() content_json = content.is_a?(String) ? content : JSON.generate(content) = Time.now.to_f = { 'id' => , 'sender' => sender, 'topic' => topic, 'content' => content_json, 'priority' => priority, 'posted_at' => , 'consumed' => '0' } # Store message data @redis.hset("message:#{message_id}", ) # Add to topic queue with score = -priority (for DESC ordering) + timestamp (for ASC within priority) # Score: higher priority = lower score (negative), then by timestamp score = (-priority * 1_000_000) + @redis.zadd("messages:#{topic}", score, ) end |
#stats ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/kbs/blackboard/redis_message_queue.rb', line 91 def stats # Count all unconsumed messages across all topics topics = @redis.keys('messages:*') total_unconsumed = 0 topics.each do |topic_key| total_unconsumed += @redis.zcard(topic_key) end # Count all messages (including consumed) = @redis.keys('message:*') = .size { total_messages: , unconsumed_messages: total_unconsumed } end |