Class: StompServer::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_server/queue.rb

Direct Known Subclasses

DBMQueue, FileQueue

Instance Method Summary collapse

Constructor Details

#initialize(directory = '.stompserver', delete_empty = true) ⇒ Queue

Returns a new instance of Queue.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/stomp_server/queue.rb', line 5

def initialize(directory='.stompserver', delete_empty=true)
  @stompid = StompServer::StompId.new
  @delete_empty = delete_empty
  @directory = directory
  Dir.mkdir(@directory) unless File.directory?(@directory)
  if File.exists?("#{@directory}/qinfo")
    qinfo = Hash.new
    File.open("#{@directory}/qinfo", "rb") { |f| qinfo = Marshal.load(f.read)}
    @queues = qinfo[:queues]
    @frames = qinfo[:frames]
  else
    @queues = Hash.new
    @frames = Hash.new
  end

  @queues.keys.each do |dest|
    puts "Queue #{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" if $DEBUG
  end

  puts "Queue initialized in #{@directory}"

  # Cleanup dead queues and save the state of the queues every so often.  Alternatively we could save the queue state every X number
  # of frames that are put in the queue.  Should probably also read it after saving it to confirm integrity.
  # Removed, this badly corrupt the queue when stopping with messages
  #EventMachine::add_periodic_timer 1800, proc {@queues.keys.each {|dest| close_queue(dest)};save_queue_state }
end

Instance Method Details

#assign_id(frame, dest) ⇒ Object



135
136
137
# File 'lib/stomp_server/queue.rb', line 135

def assign_id(frame, dest)
  frame.headers['message-id'] = @stompid[@queues[dest][:msgid]]
end

#close_queue(dest) ⇒ Object



56
57
58
59
60
61
62
63
# File 'lib/stomp_server/queue.rb', line 56

def close_queue(dest)
  if @queues[dest][:size] == 0 and @queues[dest][:frames].size == 0 and @delete_empty
    _close_queue(dest)
    @queues.delete(dest)
    @frames.delete(dest)
    puts "Queue #{dest} removed." if $DEBUG
  end
end

#dequeue(dest) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/stomp_server/queue.rb', line 111

def dequeue(dest)
  return false unless message_for?(dest)
  msgid = @queues[dest][:frames].shift
  frame = readframe(dest,msgid)
  @queues[dest][:size] -= 1
  @queues[dest][:dequeued] += 1
  @queues[dest].delete(msgid)
  close_queue(dest)
  save_queue_state
  return frame
end

#enqueue(dest, frame) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/stomp_server/queue.rb', line 95

def enqueue(dest,frame)
  open_queue(dest) unless @queues.has_key?(dest)
  msgid = assign_id(frame, dest)
  writeframe(dest,frame,msgid)
  @queues[dest][:frames].push(msgid)
  @frames[dest][msgid] = Hash.new
  @frames[dest][msgid][:exceptions] =0
  @frames[dest][msgid][:client_id] = frame.headers['client-id'] if frame.headers['client-id']
  @frames[dest][msgid][:expires] = frame.headers['expires'] if frame.headers['expires']
  @queues[dest][:msgid] += 1
  @queues[dest][:enqueued] += 1
  @queues[dest][:size] += 1
  save_queue_state
  return true
end

#message_for?(dest) ⇒ Boolean

Returns:

  • (Boolean)


123
124
125
# File 'lib/stomp_server/queue.rb', line 123

def message_for?(dest)
  return (@queues.has_key?(dest) and (!@queues[dest][:frames].empty?))
end

#monitorObject



48
49
50
51
52
53
54
# File 'lib/stomp_server/queue.rb', line 48

def monitor
  stats = Hash.new
  @queues.keys.each do |dest|
    stats[dest] = {'size' => @queues[dest][:size], 'enqueued' => @queues[dest][:enqueued], 'dequeued' => @queues[dest][:dequeued]}
  end
  stats
end

#open_queue(dest) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/stomp_server/queue.rb', line 65

def open_queue(dest)
  @queues[dest] = Hash.new
  @frames[dest] = Hash.new
  @queues[dest][:size] = 0
  @queues[dest][:frames] = Array.new
  @queues[dest][:msgid] = 1
  @queues[dest][:enqueued] = 0
  @queues[dest][:dequeued] = 0
  @queues[dest][:exceptions] = 0
  _open_queue(dest)
  puts "Created queue #{dest}" if $DEBUG
end

#readframe(dest, msgid) ⇒ Object



131
132
133
# File 'lib/stomp_server/queue.rb', line 131

def readframe(dest,msgid)
  _readframe(dest,msgid)
end

#requeue(dest, frame) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/stomp_server/queue.rb', line 78

def requeue(dest,frame)
  open_queue(dest) unless @queues.has_key?(dest)
  msgid = frame.headers['message-id']
  if frame.headers['max-exceptions'] and @frames[dest][msgid][:exceptions] >= frame.headers['max-exceptions'].to_i
    enqueue("/queue/deadletter",frame)
    return
  end
  writeframe(dest,frame,msgid)
  @queues[dest][:frames].unshift(msgid)
  @frames[dest][msgid][:exceptions] += 1
  @queues[dest][:dequeued] -= 1
  @queues[dest][:exceptions] += 1
  @queues[dest][:size] += 1
  save_queue_state
  return true
end

#save_queue_stateObject



42
43
44
45
46
# File 'lib/stomp_server/queue.rb', line 42

def save_queue_state
  puts "Saving Queue State" if $DEBUG
  qinfo = {:queues => @queues, :frames => @frames}
  File.open("#{@directory}/qinfo", "wb") { |f| f.write Marshal.dump(qinfo)}
end

#stopObject



32
33
34
35
36
37
38
39
40
# File 'lib/stomp_server/queue.rb', line 32

def stop
  puts "Shutting down Queue"

  @queues.keys.each {|dest| close_queue(dest)}
  @queues.keys.each do |dest|
    puts "Queue #{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" if $DEBUG
  end
  save_queue_state
end

#writeframe(dest, frame, msgid) ⇒ Object



127
128
129
# File 'lib/stomp_server/queue.rb', line 127

def writeframe(dest,frame,msgid)
  _writeframe(dest,frame,msgid)
end