Class: StompServer::ActiveRecordQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_server/queue/activerecord_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(configdir, storagedir) ⇒ ActiveRecordQueue

Returns a new instance of ActiveRecordQueue.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 13

def initialize(configdir, storagedir)
  # Default configuration, use SQLite for simplicity
  db_params = {
    'adapter' => 'sqlite3',
    'database' => "#{configdir}/stompserver_development"
  }
  # Load DB configuration
  db_config = "#{configdir}/database.yml"
  puts "reading from #{db_config}"
  if File.exists? db_config
    db_params.merge! YAML::load(File.open(db_config))
  end

  puts "using #{db_params['database']} DB"

  # Setup activerecord
  ActiveRecord::Base.establish_connection(db_params)
  # Development <TODO> fix this
  ActiveRecord::Base.logger = Logger.new(STDERR)
  ActiveRecord::Base.logger.level = Logger::INFO
  # we need the connection, it can't be done earlier
  ArMessage.reset_column_information
  reload_queues
  @stompid = StompServer::StompId.new
end

Instance Method Details

#affect_msgid_and_store(frame, queue_name) ⇒ Object

store a frame (assigning it a message-id)



72
73
74
75
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 72

def affect_msgid_and_store(frame, queue_name)
  msgid = assign_id(frame, queue_name)
  ArMessage.create!(:stomp_id => msgid, :frame => frame)
end

#assign_id(frame, queue_name) ⇒ Object



81
82
83
84
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 81

def assign_id(frame, queue_name)
  msgid = @stompid[@frames[queue_name][:last_index] += 1]
  frame.headers['message-id'] = msgid
end

#dequeue(queue_name) ⇒ Object

Get and remove a frame from the queue



52
53
54
55
56
57
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 52

def dequeue(queue_name)
  return nil unless @frames[queue_name] && !@frames[queue_name][:frames].empty?
  frame = @frames[queue_name][:frames].shift
  remove_from_store(frame.headers['message-id'])
  return frame
end

#enqueue(queue_name, frame) ⇒ Object

Add a frame to the queue



40
41
42
43
44
45
46
47
48
49
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 40

def enqueue(queue_name, frame)
  unless @frames[queue_name]
    @frames[queue_name] = {
      :last_index => 0,
      :frames => [],
    }
  end
  affect_msgid_and_store(frame, queue_name)
  @frames[queue_name][:frames] << frame
end

#message_for?(queue_name) ⇒ Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 77

def message_for?(queue_name)
  @frames[queue_name] && !@frames[queue_name][:frames].empty?
end

#remove_from_store(message_id) ⇒ Object

remove a frame from the store



67
68
69
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 67

def remove_from_store(message_id)
  ArMessage.find_by_stomp_id(message_id).destroy
end

#requeue(queue_name, frame) ⇒ Object

Requeue the frame previously pending



60
61
62
63
64
# File 'lib/stomp_server/queue/activerecord_queue.rb', line 60

def requeue(queue_name, frame)
  @frames[queue_name][:frames] << frame
  ArMessage.create!(:stomp_id => frame.headers['message-id'],
                    :frame => frame)
end