Class: QueueManager
- Inherits:
-
Object
- Object
- QueueManager
- Defined in:
- lib/queue_manager.rb
Overview
queue - persistent sent to a single subscriber queue_monitor - looks, but does not remove from queue
Instance Method Summary collapse
- #ack(user, frame) ⇒ Object
- #disconnect(user) ⇒ Object
-
#initialize(journal) ⇒ QueueManager
constructor
A new instance of QueueManager.
- #send_backlog(queue, user) ⇒ Object
- #send_to_user(frame, user) ⇒ Object
- #sendmsg(frame) ⇒ Object
- #subscribe(dest, user, use_ack = false) ⇒ Object
- #unsubscribe(topic, user) ⇒ Object
Constructor Details
#initialize(journal) ⇒ QueueManager
Returns a new instance of QueueManager.
7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/queue_manager.rb', line 7 def initialize(journal) # read journal information @journal = journal @queues = Hash.new { Array.new } @pending = Hash.new { Array.new } @messages = Hash.new { Array.new } # recover from previous run msgids = @journal.keys.sort msgids.each do |msgid| sendmsg(@journal[msgid]) end end |
Instance Method Details
#ack(user, frame) ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/queue_manager.rb', line 43 def ack(user, frame) pending_size = @pending[user] msgid = frame.headers['message-id'] @pending[user].delete_if { |pf| pf.headers['message-id'] == msgid } raise "Message (#{msgid}) not found" if pending_size == @pending[user] @journal.delete(msgid) end |
#disconnect(user) ⇒ Object
51 52 53 54 55 56 57 58 |
# File 'lib/queue_manager.rb', line 51 def disconnect(user) @pending[user].each do |frame| sendmsg(frame) end @queues.each do |dest, queue| queue.delete_if { |qu| qu.user == user } end end |
#send_backlog(queue, user) ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/queue_manager.rb', line 31 def send_backlog(queue, user) until queue.empty? current = queue.first send_to_user(current, user) queue.shift end end |
#send_to_user(frame, user) ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/queue_manager.rb', line 60 def send_to_user(frame, user) if user.ack @pending[user.user] += [frame] else @journal.delete(frame.headers['message-id']) end user.user.send_data(frame.to_s) end |
#sendmsg(frame) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/queue_manager.rb', line 69 def sendmsg(frame) frame.command = "MESSAGE" dest = frame.headers['destination'] @journal[frame.headers['message-id']] = frame if user = @queues[dest].shift send_to_user(frame, user) @queues[dest].push(user) else @messages[dest] += [frame] end end |
#subscribe(dest, user, use_ack = false) ⇒ Object
21 22 23 24 25 26 27 28 29 |
# File 'lib/queue_manager.rb', line 21 def subscribe(dest, user, use_ack=false) user = Struct::QueueUser.new(user, use_ack) @queues[dest] += [user] # TODO handle this is some form of call back # it is quite possible that this could be a lot # of data and block things up. send_backlog(@messages[dest], user) end |
#unsubscribe(topic, user) ⇒ Object
39 40 41 |
# File 'lib/queue_manager.rb', line 39 def unsubscribe(topic, user) @queues[topic].delete_if { |u| u.user == user } end |