Class: StompServer::QueueManager
- Inherits:
-
Object
- Object
- StompServer::QueueManager
- Defined in:
- lib/stomp_server/queue_manager.rb
Instance Method Summary collapse
- #ack(connection, frame) ⇒ Object
-
#dequeue(dest) ⇒ Object
For protocol handlers that want direct access to the queue.
- #disconnect(connection) ⇒ Object
- #enqueue(frame) ⇒ Object
-
#initialize(qstore) ⇒ QueueManager
constructor
A new instance of QueueManager.
-
#send_a_backlog(connection) ⇒ Object
Send at most one frame to a connection used when use_ack == true.
- #send_destination_backlog(dest, user) ⇒ Object
- #send_to_user(frame, user) ⇒ Object
- #sendmsg(frame) ⇒ Object
- #stop ⇒ Object
- #subscribe(dest, connection, use_ack = false) ⇒ Object
- #unsubscribe(dest, connection) ⇒ Object
Constructor Details
#initialize(qstore) ⇒ QueueManager
Returns a new instance of QueueManager.
63 64 65 66 67 68 69 70 71 72 |
# File 'lib/stomp_server/queue_manager.rb', line 63 def initialize(qstore) @qstore = qstore @queues = Hash.new { Array.new } @pending = Hash.new if $STOMP_SERVER monitor = StompServer::QueueMonitor.new(@qstore,@queues) monitor.start puts "Queue monitor started" if $DEBUG end end |
Instance Method Details
#ack(connection, frame) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/stomp_server/queue_manager.rb', line 128 def ack(connection, frame) puts "Acking #{frame.headers['message-id']}" if $DEBUG unless @pending[connection] puts "No message pending for connection!" return end msgid = frame.headers['message-id'] p_msgid = @pending[connection].headers['message-id'] if p_msgid != msgid # We don't know what happened, we requeue # (probably a client connecting to a restarted server) frame = @pending[connection] @qstore.requeue(frame.headers['destination'],frame) puts "Invalid message-id (received #{msgid} != #{p_msgid})" end @pending.delete connection # We are free to work now, look if there's something for us send_a_backlog(connection) end |
#dequeue(dest) ⇒ Object
For protocol handlers that want direct access to the queue
198 199 200 |
# File 'lib/stomp_server/queue_manager.rb', line 198 def dequeue(dest) @qstore.dequeue(dest) end |
#disconnect(connection) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/stomp_server/queue_manager.rb', line 148 def disconnect(connection) puts "Disconnecting" frame = @pending[connection] if frame @qstore.requeue(frame.headers['destination'],frame) @pending.delete connection end @queues.each do |dest, queue| queue.delete_if { |qu| qu.connection == connection } @queues.delete(dest) if queue.empty? end end |
#enqueue(frame) ⇒ Object
202 203 204 205 206 |
# File 'lib/stomp_server/queue_manager.rb', line 202 def enqueue(frame) frame.command = "MESSAGE" dest = frame.headers['destination'] @qstore.enqueue(dest,frame) end |
#send_a_backlog(connection) ⇒ Object
Send at most one frame to a connection used when use_ack == true
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/stomp_server/queue_manager.rb', line 87 def send_a_backlog(connection) puts "Sending a backlog" if $DEBUG # lookup queues with data for this connection possible_queues = @queues.select{ |destination,users| @qstore.(destination) && users.detect{|u| u.connection == connection} } if possible_queues.empty? puts "Nothing left" if $DEBUG return end # Get a random one (avoid artificial priority between queues # without coding a whole scheduler, which might be desirable later) dest,users = possible_queues[rand(possible_queues.length)] user = users.find{|u| u.connection == connection} frame = @qstore.dequeue(dest) puts "Chose #{dest}" if $DEBUG send_to_user(frame, user) end |
#send_destination_backlog(dest, user) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/stomp_server/queue_manager.rb', line 107 def send_destination_backlog(dest,user) puts "Sending destination backlog for #{dest}" if $DEBUG if user.ack # only send one message (waiting for ack) frame = @qstore.dequeue(dest) send_to_user(frame, user) if frame else while frame = @qstore.dequeue(dest) send_to_user(frame, user) end end end |
#send_to_user(frame, user) ⇒ Object
162 163 164 165 166 167 168 169 |
# File 'lib/stomp_server/queue_manager.rb', line 162 def send_to_user(frame, user) connection = user.connection if user.ack raise "other connection's end already busy" if @pending[connection] @pending[connection] = frame end connection.stomp_send_data(frame) end |
#sendmsg(frame) ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/stomp_server/queue_manager.rb', line 171 def sendmsg(frame) frame.command = "MESSAGE" dest = frame.headers['destination'] puts "Sending a message to #{dest}: #{frame}" # Lookup a user willing to handle this destination available_users = @queues[dest].reject{|user| @pending[user.connection]} if available_users.empty? @qstore.enqueue(dest,frame) return end # Look for a user with ack (we favor reliability) reliable_user = available_users.find{|u| u.ack} if reliable_user # give it a message-id @qstore.assign_id(frame, dest) send_to_user(frame, reliable_user) else random_user = available_users[rand(available_users.length)] # Note message-id header isn't set but we won't need it anyway # <TODO> could break some clients: fix this send_to_user(frame, random_user) end end |
#stop ⇒ Object
74 75 76 |
# File 'lib/stomp_server/queue_manager.rb', line 74 def stop @qstore.stop if @qstore.methods.include?('stop') end |
#subscribe(dest, connection, use_ack = false) ⇒ Object
78 79 80 81 82 83 |
# File 'lib/stomp_server/queue_manager.rb', line 78 def subscribe(dest, connection, use_ack=false) puts "Subscribing to #{dest}" user = Struct::QueueUser.new(connection, use_ack) @queues[dest] += [user] send_destination_backlog(dest,user) unless dest == '/queue/monitor' end |
#unsubscribe(dest, connection) ⇒ Object
120 121 122 123 124 125 126 |
# File 'lib/stomp_server/queue_manager.rb', line 120 def unsubscribe(dest, connection) puts "Unsubscribing from #{dest}" @queues.each do |d, queue| queue.delete_if { |qu| qu.connection == connection and d == dest} end @queues.delete(dest) if @queues[dest].empty? end |