Class: StompServer::FileQueue

Inherits:
Queue
  • Object
show all
Defined in:
lib/stomp_server_ng/queue/file_queue.rb

Overview

Low level physical queue handler.

Instance Attribute Summary

Attributes inherited from Queue

#checkpoint_interval

Instance Method Summary collapse

Methods inherited from Queue

#assign_id, #close_queue, #dequeue, #enqueue, #initialize, #message_for?, #monitor, #open_queue, #readframe, #requeue, #save_queue_state, #stop, #writeframe

Constructor Details

This class inherits a constructor from StompServer::Queue

Instance Method Details

#_close_queue(dest) ⇒ Object

Remove queue directory if it exists.



11
12
13
# File 'lib/stomp_server_ng/queue/file_queue.rb', line 11

def _close_queue(dest)
  Dir.delete(@queues[dest][:queue_dir]) if File.directory?(@queues[dest][:queue_dir])
end

#_open_queue(dest) ⇒ Object

Create queue directory if it does not alrady exist.



16
17
18
19
20
21
22
23
# File 'lib/stomp_server_ng/queue/file_queue.rb', line 16

def _open_queue(dest)
  # handle clashes between _ and /
  queue_name = dest.gsub('_','__')
  queue_name = dest.gsub('/','_')
  queue_dir = @directory + '/' + queue_name
  @queues[dest][:queue_dir] = queue_dir
  Dir.mkdir(queue_dir) unless File.directory?(queue_dir)
end

#_readframe(dest, msgid) ⇒ Object

Read a message frame from the file system.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/stomp_server_ng/queue/file_queue.rb', line 39

def _readframe(dest,msgid)
  filename = "#{@queues[dest][:queue_dir]}/#{msgid}"
  file = nil
  File.open(filename,'rb') {|f| file = f.read}
  frame_len = file[0,8].hex
  body_len = file[8,8].hex
  frame = Marshal::load(file[16,frame_len])
  frame.body = file[(frame_len + 16),body_len]
  if File.delete(filename)
    result = frame
  else
    result = false
  end
  return result
end

#_writeframe(dest, frame_todump, msgid) ⇒ Object

Write a messaage frame to the file system.



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/stomp_server_ng/queue/file_queue.rb', line 26

def _writeframe(dest,frame_todump,msgid)
  filename = "#{@queues[dest][:queue_dir]}/#{msgid}"
  frame = frame_todump.dup
  frame_body = frame.body
  frame.body = ''
  frame_image = Marshal.dump(frame)
  framelen = sprintf("%08x", frame_image.length)
  bodylen = sprintf("%08x", frame_body.length)
  File.open(filename,'wb') {|f| f.syswrite("#{framelen}#{bodylen}#{frame_image}#{frame_body}")}
  return true
end