Class: StompServer::Queue
- Inherits:
-
Object
- Object
- StompServer::Queue
- Defined in:
- lib/stomp_server/queue.rb
Instance Method Summary collapse
- #assign_id(frame, dest) ⇒ Object
- #close_queue(dest) ⇒ Object
- #dequeue(dest) ⇒ Object
- #enqueue(dest, frame) ⇒ Object
-
#initialize(directory = '.stompserver', delete_empty = true) ⇒ Queue
constructor
A new instance of Queue.
- #message_for?(dest) ⇒ Boolean
- #monitor ⇒ Object
- #open_queue(dest) ⇒ Object
- #readframe(dest, msgid) ⇒ Object
- #requeue(dest, frame) ⇒ Object
- #save_queue_state ⇒ Object
- #stop ⇒ Object
- #writeframe(dest, frame, msgid) ⇒ Object
Constructor Details
#initialize(directory = '.stompserver', delete_empty = true) ⇒ Queue
Returns a new instance of Queue.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/stomp_server/queue.rb', line 5 def initialize(directory='.stompserver', delete_empty=true) @stompid = StompServer::StompId.new @delete_empty = delete_empty @directory = directory Dir.mkdir(@directory) unless File.directory?(@directory) if File.exists?("#{@directory}/qinfo") qinfo = Hash.new File.open("#{@directory}/qinfo", "rb") { |f| qinfo = Marshal.load(f.read)} @queues = qinfo[:queues] @frames = qinfo[:frames] else @queues = Hash.new @frames = Hash.new end @queues.keys.each do |dest| puts "Queue #{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" if $DEBUG end puts "Queue initialized in #{@directory}" # Cleanup dead queues and save the state of the queues every so often. Alternatively we could save the queue state every X number # of frames that are put in the queue. Should probably also read it after saving it to confirm integrity. # Removed, this badly corrupt the queue when stopping with messages #EventMachine::add_periodic_timer 1800, proc {@queues.keys.each {|dest| close_queue(dest)};save_queue_state } end |
Instance Method Details
#assign_id(frame, dest) ⇒ Object
135 136 137 |
# File 'lib/stomp_server/queue.rb', line 135 def assign_id(frame, dest) frame.headers['message-id'] = @stompid[@queues[dest][:msgid]] end |
#close_queue(dest) ⇒ Object
56 57 58 59 60 61 62 63 |
# File 'lib/stomp_server/queue.rb', line 56 def close_queue(dest) if @queues[dest][:size] == 0 and @queues[dest][:frames].size == 0 and @delete_empty _close_queue(dest) @queues.delete(dest) @frames.delete(dest) puts "Queue #{dest} removed." if $DEBUG end end |
#dequeue(dest) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/stomp_server/queue.rb', line 111 def dequeue(dest) return false unless (dest) msgid = @queues[dest][:frames].shift frame = readframe(dest,msgid) @queues[dest][:size] -= 1 @queues[dest][:dequeued] += 1 @queues[dest].delete(msgid) close_queue(dest) save_queue_state return frame end |
#enqueue(dest, frame) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/stomp_server/queue.rb', line 95 def enqueue(dest,frame) open_queue(dest) unless @queues.has_key?(dest) msgid = assign_id(frame, dest) writeframe(dest,frame,msgid) @queues[dest][:frames].push(msgid) @frames[dest][msgid] = Hash.new @frames[dest][msgid][:exceptions] =0 @frames[dest][msgid][:client_id] = frame.headers['client-id'] if frame.headers['client-id'] @frames[dest][msgid][:expires] = frame.headers['expires'] if frame.headers['expires'] @queues[dest][:msgid] += 1 @queues[dest][:enqueued] += 1 @queues[dest][:size] += 1 save_queue_state return true end |
#message_for?(dest) ⇒ Boolean
123 124 125 |
# File 'lib/stomp_server/queue.rb', line 123 def (dest) return (@queues.has_key?(dest) and (!@queues[dest][:frames].empty?)) end |
#monitor ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/stomp_server/queue.rb', line 48 def monitor stats = Hash.new @queues.keys.each do |dest| stats[dest] = {'size' => @queues[dest][:size], 'enqueued' => @queues[dest][:enqueued], 'dequeued' => @queues[dest][:dequeued]} end stats end |
#open_queue(dest) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/stomp_server/queue.rb', line 65 def open_queue(dest) @queues[dest] = Hash.new @frames[dest] = Hash.new @queues[dest][:size] = 0 @queues[dest][:frames] = Array.new @queues[dest][:msgid] = 1 @queues[dest][:enqueued] = 0 @queues[dest][:dequeued] = 0 @queues[dest][:exceptions] = 0 _open_queue(dest) puts "Created queue #{dest}" if $DEBUG end |
#readframe(dest, msgid) ⇒ Object
131 132 133 |
# File 'lib/stomp_server/queue.rb', line 131 def readframe(dest,msgid) _readframe(dest,msgid) end |
#requeue(dest, frame) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/stomp_server/queue.rb', line 78 def requeue(dest,frame) open_queue(dest) unless @queues.has_key?(dest) msgid = frame.headers['message-id'] if frame.headers['max-exceptions'] and @frames[dest][msgid][:exceptions] >= frame.headers['max-exceptions'].to_i enqueue("/queue/deadletter",frame) return end writeframe(dest,frame,msgid) @queues[dest][:frames].unshift(msgid) @frames[dest][msgid][:exceptions] += 1 @queues[dest][:dequeued] -= 1 @queues[dest][:exceptions] += 1 @queues[dest][:size] += 1 save_queue_state return true end |
#save_queue_state ⇒ Object
42 43 44 45 46 |
# File 'lib/stomp_server/queue.rb', line 42 def save_queue_state puts "Saving Queue State" if $DEBUG qinfo = {:queues => @queues, :frames => @frames} File.open("#{@directory}/qinfo", "wb") { |f| f.write Marshal.dump(qinfo)} end |
#stop ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/stomp_server/queue.rb', line 32 def stop puts "Shutting down Queue" @queues.keys.each {|dest| close_queue(dest)} @queues.keys.each do |dest| puts "Queue #{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" if $DEBUG end save_queue_state end |
#writeframe(dest, frame, msgid) ⇒ Object
127 128 129 |
# File 'lib/stomp_server/queue.rb', line 127 def writeframe(dest,frame,msgid) _writeframe(dest,frame,msgid) end |