Class: Qsagi::StandardQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/qsagi/standard_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ StandardQueue

Returns a new instance of StandardQueue.



5
6
7
# File 'lib/qsagi/standard_queue.rb', line 5

def initialize(options={})
  @options = options
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



3
4
5
# File 'lib/qsagi/standard_queue.rb', line 3

def channel
  @channel
end

#optionsObject (readonly)

Returns the value of attribute options.



3
4
5
# File 'lib/qsagi/standard_queue.rb', line 3

def options
  @options
end

Instance Method Details

#_message_classObject



71
72
73
# File 'lib/qsagi/standard_queue.rb', line 71

def _message_class
  options[:message_class]
end

#_serializerObject



75
76
77
# File 'lib/qsagi/standard_queue.rb', line 75

def _serializer
  options[:serializer]
end

#ack(message) ⇒ Object



9
10
11
# File 'lib/qsagi/standard_queue.rb', line 9

def ack(message)
  @channel.ack(message.delivery_tag, false)
end

#clearObject



17
18
19
# File 'lib/qsagi/standard_queue.rb', line 17

def clear
  @queue.purge
end

#connectObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/qsagi/standard_queue.rb', line 21

def connect
  client_options = {
    :host => options[:host],
    :port => options[:port],
    :heartbeat => options[:heartbeat],
    :continuation_timeout => options[:continuation_timeout],
    :username => options[:username],
    :password => options[:password],
    :connect_timeout => options[:connect_timeout],
    :read_timeout => options[:read_timeout],
    :write_timeout => options[:write_timeout],
    :logger => options[:logger]
  }
  client_options.delete(:logger) if client_options[:logger].nil?

  @client = Bunny.new(client_options)
  @client.start
  @channel = @client.create_channel
  @exchange = @channel.exchange(options[:exchange], options[:exchange_options])
  @queue = @channel.queue(options[:queue_name], :durable => options[:durable], :arguments => options[:queue_arguments])
  @queue.bind(@exchange, :routing_key => options[:queue_name]) unless options[:exchange].empty?
end

#disconnectObject



44
45
46
# File 'lib/qsagi/standard_queue.rb', line 44

def disconnect
  @client.close unless @client.nil?
end

#lengthObject



48
49
50
# File 'lib/qsagi/standard_queue.rb', line 48

def length
  @queue.status[:message_count]
end

#pop(options = {}) ⇒ Object



52
53
54
55
56
57
58
59
# File 'lib/qsagi/standard_queue.rb', line 52

def pop(options = {})
  auto_ack = options.fetch(:auto_ack, true)
  delivery_info, properties, message = @queue.pop(:manual_ack => !auto_ack)

  unless message.nil?
    _message_class.new(delivery_info, _serializer.deserialize(message))
  end
end

#push(message) ⇒ Object



61
62
63
64
# File 'lib/qsagi/standard_queue.rb', line 61

def push(message)
  serialized_message = options[:serializer].serialize(message)
  @exchange.publish(serialized_message, :routing_key => @queue.name, :persistent => options[:persistent], :mandatory => options[:mandatory])
end

#reconnectObject



66
67
68
69
# File 'lib/qsagi/standard_queue.rb', line 66

def reconnect
  disconnect
  connect
end

#reject(message, options = {}) ⇒ Object



13
14
15
# File 'lib/qsagi/standard_queue.rb', line 13

def reject(message, options={})
  @channel.reject(message.delivery_tag, options.fetch(:requeue, true))
end