Class: Qsagi::StandardQueue
- Inherits:
-
Object
- Object
- Qsagi::StandardQueue
- Defined in:
- lib/qsagi/standard_queue.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
- #_message_class ⇒ Object
- #_serializer ⇒ Object
- #ack(message) ⇒ Object
- #clear ⇒ Object
- #connect ⇒ Object
- #disconnect ⇒ Object
-
#initialize(options = {}) ⇒ StandardQueue
constructor
A new instance of StandardQueue.
- #length ⇒ Object
- #pop(options = {}) ⇒ Object
- #push(message) ⇒ Object
- #reconnect ⇒ Object
- #reject(message, options = {}) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ StandardQueue
Returns a new instance of StandardQueue.
5 6 7 |
# File 'lib/qsagi/standard_queue.rb', line 5 def initialize(={}) @options = end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
3 4 5 |
# File 'lib/qsagi/standard_queue.rb', line 3 def channel @channel end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
3 4 5 |
# File 'lib/qsagi/standard_queue.rb', line 3 def @options end |
Instance Method Details
#_message_class ⇒ Object
64 65 66 |
# File 'lib/qsagi/standard_queue.rb', line 64 def [:message_class] end |
#_serializer ⇒ Object
68 69 70 |
# File 'lib/qsagi/standard_queue.rb', line 68 def _serializer [:serializer] end |
#ack(message) ⇒ Object
9 10 11 |
# File 'lib/qsagi/standard_queue.rb', line 9 def ack() @channel.ack(.delivery_tag, false) end |
#clear ⇒ Object
17 18 19 |
# File 'lib/qsagi/standard_queue.rb', line 17 def clear @queue.purge end |
#connect ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/qsagi/standard_queue.rb', line 21 def connect @client = Bunny.new( :host => [:host], :port => [:port], :heartbeat => [:heartbeat], :continuation_timeout => [:continuation_timeout], :username => [:username], :password => [:password], ) @client.start @channel = @client.create_channel @exchange = @channel.exchange([:exchange], [:exchange_options]) @queue = @channel.queue([:queue_name], :durable => [:durable], :arguments => [:queue_arguments]) @queue.bind(@exchange, :routing_key => [:queue_name]) unless [:exchange].empty? end |
#disconnect ⇒ Object
37 38 39 |
# File 'lib/qsagi/standard_queue.rb', line 37 def disconnect @client.close unless @client.nil? end |
#length ⇒ Object
41 42 43 |
# File 'lib/qsagi/standard_queue.rb', line 41 def length @queue.status[:message_count] end |
#pop(options = {}) ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/qsagi/standard_queue.rb', line 45 def pop( = {}) auto_ack = .fetch(:auto_ack, true) delivery_info, properties, = @queue.pop(:manual_ack => !auto_ack) unless .nil? .new(delivery_info, _serializer.deserialize()) end end |
#push(message) ⇒ Object
54 55 56 57 |
# File 'lib/qsagi/standard_queue.rb', line 54 def push() = [:serializer].serialize() @exchange.publish(, :routing_key => @queue.name, :persistent => [:persistent], :mandatory => [:mandatory]) end |
#reconnect ⇒ Object
59 60 61 62 |
# File 'lib/qsagi/standard_queue.rb', line 59 def reconnect disconnect connect end |
#reject(message, options = {}) ⇒ Object
13 14 15 |
# File 'lib/qsagi/standard_queue.rb', line 13 def reject(, ={}) @channel.reject(.delivery_tag, .fetch(:requeue, true)) end |