Module: Qsagi::Queue

Defined in:
lib/qsagi/queue.rb

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(klass) ⇒ Object



49
50
51
# File 'lib/qsagi/queue.rb', line 49

def self.included(klass)
  klass.extend ClassMethods
end

Instance Method Details

#ack(message) ⇒ Object



3
4
5
# File 'lib/qsagi/queue.rb', line 3

def ack(message)
  @queue.ack(:delivery_tag => message.delivery_tag)
end

#clearObject



7
8
9
10
11
12
# File 'lib/qsagi/queue.rb', line 7

def clear
  loop do
    message = @queue.pop
    break if message[:payload] == :queue_empty
  end
end

#connectObject



14
15
16
17
18
19
20
# File 'lib/qsagi/queue.rb', line 14

def connect
  @client = Bunny.new(:host => self.class.host, :port => self.class.port)
  @client.start
  @queue = @client.queue(self.class.queue_name, :durable => true, :arguments => {"x-ha-policy" => "all"})
  @exchange = @client.exchange(self.class._exchange)
  @queue.bind(@exchange, :key => self.class.queue_name) unless self.class._exchange.empty?
end

#disconnectObject



22
23
24
# File 'lib/qsagi/queue.rb', line 22

def disconnect
  @client.send(:close_socket) unless @client.nil?
end

#lengthObject



26
27
28
# File 'lib/qsagi/queue.rb', line 26

def length
  @queue.status[:message_count]
end

#pop(options = {}) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/qsagi/queue.rb', line 30

def pop(options = {})
  auto_ack = options.fetch(:auto_ack, true)
  message = @queue.pop(:ack => !auto_ack)

  unless message[:payload] == :queue_empty
    self.class._message_class.new(message, self.class._serializer.deserialize(message[:payload]))
  end
end

#push(message) ⇒ Object



39
40
41
42
# File 'lib/qsagi/queue.rb', line 39

def push(message)
  serialized_message = self.class._serializer.serialize(message)
  @exchange.publish(serialized_message, :key => @queue.name, :persistent => true, :mandatory => true)
end

#reconnectObject



44
45
46
47
# File 'lib/qsagi/queue.rb', line 44

def reconnect
  disconnect
  connect
end