Class: KBS::Blackboard::MessageQueue
- Inherits:
-
Object
- Object
- KBS::Blackboard::MessageQueue
- Defined in:
- lib/kbs/blackboard/message_queue.rb
Instance Method Summary collapse
- #consume(topic, consumer) ⇒ Object
-
#initialize(db) ⇒ MessageQueue
constructor
A new instance of MessageQueue.
- #peek(topic, limit: 10) ⇒ Object
- #post(sender, topic, content, priority: 0) ⇒ Object
- #setup_table ⇒ Object
- #stats ⇒ Object
Constructor Details
#initialize(db) ⇒ MessageQueue
Returns a new instance of MessageQueue.
9 10 11 12 |
# File 'lib/kbs/blackboard/message_queue.rb', line 9 def initialize(db) @db = db setup_table end |
Instance Method Details
#consume(topic, consumer) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/kbs/blackboard/message_queue.rb', line 47 def consume(topic, consumer) result = @db.get_first_row( "SELECT * FROM blackboard_messages WHERE topic = ? AND consumed = 0 ORDER BY priority DESC, posted_at ASC LIMIT 1", [topic] ) if result @db.execute( "UPDATE blackboard_messages SET consumed = 1, consumed_by = ?, consumed_at = CURRENT_TIMESTAMP WHERE id = ?", [consumer, result['id']] ) { id: result['id'], sender: result['sender'], topic: result['topic'], content: JSON.parse(result['content'], symbolize_names: true), priority: result['priority'], posted_at: Time.parse(result['posted_at']) } end end |
#peek(topic, limit: 10) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/kbs/blackboard/message_queue.rb', line 70 def peek(topic, limit: 10) results = @db.execute( "SELECT * FROM blackboard_messages WHERE topic = ? AND consumed = 0 ORDER BY priority DESC, posted_at ASC LIMIT ?", [topic, limit] ) results.map do |row| { id: row['id'], sender: row['sender'], topic: row['topic'], content: JSON.parse(row['content'], symbolize_names: true), priority: row['priority'], posted_at: Time.parse(row['posted_at']) } end end |
#post(sender, topic, content, priority: 0) ⇒ Object
38 39 40 41 42 43 44 45 |
# File 'lib/kbs/blackboard/message_queue.rb', line 38 def post(sender, topic, content, priority: 0) content_json = content.is_a?(String) ? content : JSON.generate(content) @db.execute( "INSERT INTO blackboard_messages (sender, topic, content, priority) VALUES (?, ?, ?, ?)", [sender, topic, content_json, priority] ) end |
#setup_table ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/kbs/blackboard/message_queue.rb', line 14 def setup_table @db.execute_batch <<-SQL CREATE TABLE IF NOT EXISTS blackboard_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender TEXT NOT NULL, topic TEXT NOT NULL, content TEXT NOT NULL, priority INTEGER DEFAULT 0, posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, consumed BOOLEAN DEFAULT 0, consumed_by TEXT, consumed_at TIMESTAMP ); SQL @db.execute <<-SQL CREATE INDEX IF NOT EXISTS idx_messages_topic ON blackboard_messages(topic); SQL @db.execute <<-SQL CREATE INDEX IF NOT EXISTS idx_messages_consumed ON blackboard_messages(consumed); SQL end |
#stats ⇒ Object
88 89 90 91 92 93 |
# File 'lib/kbs/blackboard/message_queue.rb', line 88 def stats { total_messages: @db.get_first_value("SELECT COUNT(*) FROM blackboard_messages"), unconsumed_messages: @db.get_first_value("SELECT COUNT(*) FROM blackboard_messages WHERE consumed = 0") } end |