Class: Cloudist::Queue
Overview
NOTE: Queue is Deprecated, please use BasicQueue
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#ex ⇒ Object
readonly
Returns the value of attribute ex.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#q ⇒ Object
readonly
Returns the value of attribute q.
Instance Method Summary collapse
- #destroy ⇒ Object
-
#initialize(name, options = {}) ⇒ Queue
constructor
A new instance of Queue.
- #inspect ⇒ Object
- #log ⇒ Object
- #publish(msg) ⇒ Object
-
#publish_to_ex(body, headers = {}) ⇒ Object
def channel self.class.channel end.
- #publish_to_q(body, headers = {}) ⇒ Object
- #purge ⇒ Object
- #subscribe(options = {}, &block) ⇒ Object
- #tag ⇒ Object
- #teardown ⇒ Object
Constructor Details
#initialize(name, options = {}) ⇒ Queue
Returns a new instance of Queue.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/cloudist/queue.rb', line 11 def initialize(name, = {}) self.class.cached_queues ||= {} = { :auto_delete => false, :durable => true }.update() @name, = name, setup p self.cached_queues.keys log.debug(tag) purge end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
7 8 9 |
# File 'lib/cloudist/queue.rb', line 7 def channel @channel end |
#ex ⇒ Object (readonly)
Returns the value of attribute ex.
7 8 9 |
# File 'lib/cloudist/queue.rb', line 7 def ex @ex end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
7 8 9 |
# File 'lib/cloudist/queue.rb', line 7 def name @name end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
7 8 9 |
# File 'lib/cloudist/queue.rb', line 7 def end |
#q ⇒ Object (readonly)
Returns the value of attribute q.
7 8 9 |
# File 'lib/cloudist/queue.rb', line 7 def q @q end |
Instance Method Details
#destroy ⇒ Object
84 85 86 |
# File 'lib/cloudist/queue.rb', line 84 def destroy teardown end |
#inspect ⇒ Object
32 33 34 |
# File 'lib/cloudist/queue.rb', line 32 def inspect "<#{self.class.name} name=#{name} exchange=#{ex ? ex.name : 'nil'}>" end |
#log ⇒ Object
36 37 38 |
# File 'lib/cloudist/queue.rb', line 36 def log Cloudist.log end |
#publish(msg) ⇒ Object
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/cloudist/queue.rb', line 47 def publish(msg) raise ArgumentError, "Publish expects a Cloudist::Message object" unless msg.is_a?(Cloudist::Message) body, headers = msg.encoded # EM.defer { publish_to_ex(body, headers) # } p msg.body.to_hash end |
#publish_to_ex(body, headers = {}) ⇒ Object
def channel
self.class.channel
end
def q
self.class.q
end
def ex
self.class.ex
end
70 71 72 |
# File 'lib/cloudist/queue.rb', line 70 def publish_to_ex(body, headers = {}) ex.publish(body, headers) end |
#publish_to_q(body, headers = {}) ⇒ Object
74 75 76 |
# File 'lib/cloudist/queue.rb', line 74 def publish_to_q(body, headers = {}) q.publish(body, headers) end |
#purge ⇒ Object
28 29 30 |
# File 'lib/cloudist/queue.rb', line 28 def purge q.purge end |
#subscribe(options = {}, &block) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/cloudist/queue.rb', line 88 def subscribe( = {}, &block) [:ack] = true q.subscribe() do |queue_header, | request = Cloudist::Request.new(self, , queue_header) msg = Cloudist::Message.new(*request.) EM.defer do begin raise Cloudist::ExpiredMessage if request.expired? yield msg rescue Cloudist::ExpiredMessage log.error "AMQP Message Timeout: #{tag} ttl=#{request.ttl} age=#{request.age}" rescue Exception => e Cloudist.handle_error(e) ensure request.ack unless Cloudist.closing? end end end end |
#tag ⇒ Object
40 41 42 43 44 45 |
# File 'lib/cloudist/queue.rb', line 40 def tag [].tap { |a| a << "queue=#{q.name}" if q a << "exchange=#{ex.name}" if ex }.join(' ') end |
#teardown ⇒ Object
78 79 80 81 82 |
# File 'lib/cloudist/queue.rb', line 78 def teardown q.unsubscribe channel.close log.debug "AMQP Unsubscribed: #{tag}" end |