Class: KBS::Blackboard::MessageQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/kbs/blackboard/message_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(db) ⇒ MessageQueue

Returns a new instance of MessageQueue.



9
10
11
12
# File 'lib/kbs/blackboard/message_queue.rb', line 9

def initialize(db)
  @db = db
  setup_table
end

Instance Method Details

#consume(topic, consumer) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/kbs/blackboard/message_queue.rb', line 47

def consume(topic, consumer)
  result = @db.get_first_row(
    "SELECT * FROM blackboard_messages WHERE topic = ? AND consumed = 0 ORDER BY priority DESC, posted_at ASC LIMIT 1",
    [topic]
  )

  if result
    @db.execute(
      "UPDATE blackboard_messages SET consumed = 1, consumed_by = ?, consumed_at = CURRENT_TIMESTAMP WHERE id = ?",
      [consumer, result['id']]
    )

    {
      id: result['id'],
      sender: result['sender'],
      topic: result['topic'],
      content: JSON.parse(result['content'], symbolize_names: true),
      priority: result['priority'],
      posted_at: Time.parse(result['posted_at'])
    }
  end
end

#peek(topic, limit: 10) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/kbs/blackboard/message_queue.rb', line 70

def peek(topic, limit: 10)
  results = @db.execute(
    "SELECT * FROM blackboard_messages WHERE topic = ? AND consumed = 0 ORDER BY priority DESC, posted_at ASC LIMIT ?",
    [topic, limit]
  )

  results.map do |row|
    {
      id: row['id'],
      sender: row['sender'],
      topic: row['topic'],
      content: JSON.parse(row['content'], symbolize_names: true),
      priority: row['priority'],
      posted_at: Time.parse(row['posted_at'])
    }
  end
end

#post(sender, topic, content, priority: 0) ⇒ Object



38
39
40
41
42
43
44
45
# File 'lib/kbs/blackboard/message_queue.rb', line 38

def post(sender, topic, content, priority: 0)
  content_json = content.is_a?(String) ? content : JSON.generate(content)

  @db.execute(
    "INSERT INTO blackboard_messages (sender, topic, content, priority) VALUES (?, ?, ?, ?)",
    [sender, topic, content_json, priority]
  )
end

#setup_tableObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/kbs/blackboard/message_queue.rb', line 14

def setup_table
  @db.execute_batch <<-SQL
    CREATE TABLE IF NOT EXISTS blackboard_messages (
      id INTEGER PRIMARY KEY AUTOINCREMENT,
      sender TEXT NOT NULL,
      topic TEXT NOT NULL,
      content TEXT NOT NULL,
      priority INTEGER DEFAULT 0,
      posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
      consumed BOOLEAN DEFAULT 0,
      consumed_by TEXT,
      consumed_at TIMESTAMP
    );
  SQL

  @db.execute <<-SQL
    CREATE INDEX IF NOT EXISTS idx_messages_topic ON blackboard_messages(topic);
  SQL

  @db.execute <<-SQL
    CREATE INDEX IF NOT EXISTS idx_messages_consumed ON blackboard_messages(consumed);
  SQL
end

#statsObject



88
89
90
91
92
93
# File 'lib/kbs/blackboard/message_queue.rb', line 88

def stats
  {
    total_messages: @db.get_first_value("SELECT COUNT(*) FROM blackboard_messages"),
    unconsumed_messages: @db.get_first_value("SELECT COUNT(*) FROM blackboard_messages WHERE consumed = 0")
  }
end