Class: StompServer::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_server_ng/queue.rb

Overview

Queue

Direct Known Subclasses

DBMQueue, FileQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(directory = '.stompserver', delete_empty = true) ⇒ Queue

initiialize



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/stomp_server_ng/queue.rb', line 13

def initialize(directory='.stompserver', delete_empty=true)

  @@log = Logger.new(STDOUT)
  @@log.level = StompServer::LogHelper.get_loglevel()
  @@log.debug("Q #{self} initialization starts")

  @stompid = StompServer::StompId.new
  @delete_empty = delete_empty
  @directory = directory
  Dir.mkdir(@directory) unless File.directory?(@directory)
  if File.exists?("#{@directory}/qinfo")
    qinfo = Hash.new
    File.open("#{@directory}/qinfo", "rb") { |f| qinfo = Marshal.load(f.read)}
    @queues = qinfo[:queues]
    @frames = qinfo[:frames]
  else
    @queues = Hash.new
    @frames = Hash.new
  end

  @queues.keys.each do |dest|
    @@log.debug "Q  #{self} dest=#{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}"
  end

  @@log.debug("Q #{self} initialized in #{@directory}")

  #
  # Cleanup dead queues and save the state of the queues every so often.  
  # Alternatively we could save the queue state every X number
  # of frames that are put in the queue.  
  # Should probably also read it after saving it to confirm integrity.
  #
  # Removed: this badly corrupts the queue when stopping with messages
  #
  # EventMachine::add_periodic_timer 1800, proc {@queues.keys.each 
  # {|dest| close_queue(dest)};save_queue_state }
  #
end

Instance Attribute Details

#checkpoint_intervalObject

the check point interval



10
11
12
# File 'lib/stomp_server_ng/queue.rb', line 10

def checkpoint_interval
  @checkpoint_interval
end

Instance Method Details

#assign_id(frame, dest) ⇒ Object

assign_id



229
230
231
232
233
# File 'lib/stomp_server_ng/queue.rb', line 229

def assign_id(frame, dest)
  @@log.debug "#{frame.headers['session']} assign_id, frame: #{frame}, dest: #{dest}"
  msg_id = @queues[dest].nil? ? 1 : @queues[dest][:msgid] 
  frame.headers['message-id'] = @stompid[msg_id] 
end

#close_queue(dest, session_id) ⇒ Object

close_queue



94
95
96
97
98
99
100
101
102
# File 'lib/stomp_server_ng/queue.rb', line 94

def close_queue(dest, session_id)
  @@log.debug "#{session_id} close_queue"
  if @queues[dest][:size] == 0 and @queues[dest][:frames].size == 0 and @delete_empty
    _close_queue(dest)
    @queues.delete(dest)
    @frames.delete(dest)
    @@log.debug "#{session_id} Queue #{dest} removed."
  end
end

#dequeue(dest, session_id) ⇒ Object

dequeue



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/stomp_server_ng/queue.rb', line 185

def dequeue(dest, session_id)
  @@log.debug "#{session_id} dequeue, dest: #{dest}"
  return false unless message_for?(dest, session_id)
  # update queues ... dest .... :frames here
  msgid = @queues[dest][:frames].shift
  frame = readframe(dest,msgid,session_id)
  @@log.debug("#{frame.headers['session']} Dequeue for message: #{msgid} Client: #{frame.headers['client-id'] if frame.headers['client-id']}")

  # update queues (queues[dest])
  # :size, :frames, :msgid, :enqueued, :dequeued, :exceptions
  @queues[dest][:size] -= 1
  # :frames - see above
  @queues[dest][:msgid] -= 1
  # :enqueued - no change
  @queues[dest][:dequeued] += 1
  # :exceptions - no change

  @queues[dest].delete(msgid)

  close_queue(dest, frame.headers['session'])
  save_queue_state(frame.headers['session'])
  return frame
end

#enqueue(dest, frame) ⇒ Object

enqueue



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/stomp_server_ng/queue.rb', line 161

def enqueue(dest,frame)
  @@log.debug "#{frame.headers['session']} enqueue"
  open_queue(dest, frame.headers['session']) unless @queues.has_key?(dest)
  msgid = assign_id(frame, dest)
  @@log.debug("#{frame.headers['session']} Enqueue for message: #{msgid} Client: #{frame.headers['client-id'] if frame.headers['client-id']}")
  writeframe(dest,frame,msgid)

  # update queues (queues[dest])
  # :size, :frames, :msgid, :enqueued, :dequeued, :exceptions
  @queues[dest][:size] += 1
  @queues[dest][:frames].push(msgid)
  @queues[dest][:msgid] += 1
  @queues[dest][:enqueued] += 1
  # no :dequeue here
  # no :exceptions here

  # Update frames
  # Initialize frames entry for this: dest, frame, and msgid
  new_frames_entry(dest, frame, msgid)
  save_queue_state(frame.headers['session'])
  return true
end

#message_for?(dest, session_id) ⇒ Boolean

messsage_for?

Returns:

  • (Boolean)


210
211
212
213
214
# File 'lib/stomp_server_ng/queue.rb', line 210

def message_for?(dest, session_id)
  retval = (@queues.has_key?(dest) and (!@queues[dest][:frames].empty?))
  @@log.debug "#{session_id} message_for?, dest: #{dest}, #{retval}"
  return retval
end

#monitorObject

monitor



79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/stomp_server_ng/queue.rb', line 79

def monitor
  @@log.debug "#{self} monitor"
  stats = Hash.new
  @queues.keys.each do |dest|
    stats[dest] = { 
      'size'        => @queues[dest][:size], 
      'enqueued'    => @queues[dest][:enqueued], 
      'dequeued'    => @queues[dest][:dequeued],
      'exceptions'  => @queues[dest][:exceptions],
    }
  end
  stats
end

#open_queue(dest, session_id) ⇒ Object

open_queue



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/stomp_server_ng/queue.rb', line 105

def open_queue(dest, session_id)
  @@log.debug "#{session_id} open_queue"
  # New queue
  @queues[dest] = Hash.new
  # New frames for this queue
  @frames[dest] = Hash.new
  # Update queues
  # :size, :frames, :msgid, :enqueued, :dequeued, :exceptions
  @queues[dest][:size] = 0
  @queues[dest][:frames] = Array.new
  @queues[dest][:msgid] = 1
  @queues[dest][:enqueued] = 0
  @queues[dest][:dequeued] = 0
  @queues[dest][:exceptions] = 0
  _open_queue(dest)
  @@log.debug "Created queue #{dest}"
end

#readframe(dest, msgid, session_id) ⇒ Object

readframe



223
224
225
226
# File 'lib/stomp_server_ng/queue.rb', line 223

def readframe(dest,msgid, session_id)
  @@log.debug "#{session_id} readframe, dest: #{dest}, msgid: #{msgid}"
  _readframe(dest,msgid)
end

#requeue(dest, frame) ⇒ Object

requeue



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/stomp_server_ng/queue.rb', line 124

def requeue(dest,frame)
  @@log.debug "#{frame.headers['session']} requeue, for #{dest}, frame: #{frame.inspect}"
  open_queue(dest, frame.headers['session']) unless @queues.has_key?(dest)
  msgid = frame.headers['message-id']
  #
  # Note: frame.headers['max-exceptions'] is currently _never_ set any where!
  #
  if frame.headers['max-exceptions'] and @frames[dest][msgid][:exceptions] >= frame.headers['max-exceptions'].to_i
    enqueue("/queue/deadletter",frame)
    return
  end
  #
  writeframe(dest,frame,msgid)

  # update queues (queues[dest])
  # :size, :frames, :msgid, :enqueued, :dequeued, :exceptions
  @queues[dest][:size] += 1
  @queues[dest][:frames].unshift(msgid)
  # no :msgid here
  # no :enqueued here
  # no :dequeued here
  @queues[dest][:exceptions] += 1

  # update frames
  #
  # Is this _always_ the case in this method ?????
  unless @frames[dest][msgid]
    new_frames_entry(dest, frame, msgid)
  end
  #
  @frames[dest][msgid][:exceptions] += 1
  @frames[dest][msgid][:requeued] += 1
  save_queue_state(frame.headers['session'])
  return true
end

#save_queue_state(session_id) ⇒ Object

save_queue_state



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/stomp_server_ng/queue.rb', line 64

def save_queue_state(session_id)
  @@log.debug "#{session_id} save_queue_state"
  now=Time.now
  @next_save ||=now
  if now >= @next_save
    @@log.debug "#{session_id} saving state"
    qinfo = {:queues => @queues, :frames => @frames}
    # write then rename to make sure this is atomic
    File.open("#{@directory}/qinfo.new", "wb") { |f| f.write Marshal.dump(qinfo)}
    File.rename("#{@directory}/qinfo.new","#{@directory}/qinfo")
    @next_save=now+checkpoint_interval
  end
end

#stop(session_id) ⇒ Object

stop



53
54
55
56
57
58
59
60
61
# File 'lib/stomp_server_ng/queue.rb', line 53

def stop(session_id)
  @@log.debug "#{session_id} Shutting down Queues, queue count: #{@queues.size}"
  #
  @queues.keys.each do |dest|
    @@log.debug "#{session_id}: Queue #{dest}: size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}"
    close_queue(dest, session_id)
  end
  save_queue_state(session_id)
end

#writeframe(dest, frame, msgid) ⇒ Object

writeframe



217
218
219
220
# File 'lib/stomp_server_ng/queue.rb', line 217

def writeframe(dest,frame,msgid)
  @@log.debug "#{frame.headers['session']} writeframe, dest: #{dest}, frame: #{frame}, msgid: #{msgid}"
  _writeframe(dest,frame,msgid)
end