Class: AliMns::Queue
- Inherits:
-
Object
- Object
- AliMns::Queue
- Defined in:
- lib/ali_mns/queue.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
Instance Method Summary collapse
-
#batch_receive_message(limit: 16, wait_seconds: nil) ⇒ Object
批量获取消息 本接口用于消费者批量消费队列的消息,一次BatchReceiveMessage操作最多可以获取16条消息。该操作会将取得的消息状态变成Inactive,Inactive的时间长度由Queue属性VisibilityTimeout指定(详见CreateQueue接口)。 消费者在VisibilityTimeout时间内消费成功后需要调用DeleteMessage接口删除取得的消息,否则取得的消息将会被重新置为Active,又可被消费者重新消费。.
- #create(opts = {}) ⇒ Object
- #delete ⇒ Object
-
#initialize(name) ⇒ Queue
constructor
A new instance of Queue.
- #messages_path ⇒ Object
- #peek_message ⇒ Object
- #queue_path ⇒ Object
- #receive_message(wait_seconds: nil) ⇒ Object
- #send_message(message, opts = {}) ⇒ Object
Constructor Details
#initialize(name) ⇒ Queue
Returns a new instance of Queue.
20 21 22 |
# File 'lib/ali_mns/queue.rb', line 20 def initialize name @name = name end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
3 4 5 |
# File 'lib/ali_mns/queue.rb', line 3 def name @name end |
Class Method Details
.[](name) ⇒ Object
8 9 10 |
# File 'lib/ali_mns/queue.rb', line 8 def [] name Queue.new(name) end |
.queues(opts = {}) ⇒ Object
12 13 14 15 16 17 |
# File 'lib/ali_mns/queue.rb', line 12 def queues opts={} = {query: "x-mns-prefix", offset: "x-mns-marker", size: "x-mns-ret-number"} mns_headers = opts.slice(*.keys).reduce({}){|mns_headers, item| k, v = *item; mns_headers.merge!([k]=>v)} response = Request.get("/", mns_headers: mns_headers) Hash.xml_array(response, "Queues", "Queue").collect{|item| Queue.new(URI(item["QueueURL"]).path.sub!(/^\//, ""))} end |
Instance Method Details
#batch_receive_message(limit: 16, wait_seconds: nil) ⇒ Object
批量获取消息本接口用于消费者批量消费队列的消息,一次BatchReceiveMessage操作最多可以获取16条消息。该操作会将取得的消息状态变成Inactive,Inactive的时间长度由Queue属性VisibilityTimeout指定(详见CreateQueue接口)。 消费者在VisibilityTimeout时间内消费成功后需要调用DeleteMessage接口删除取得的消息,否则取得的消息将会被重新置为Active,又可被消费者重新消费。
56 57 58 59 60 61 62 |
# File 'lib/ali_mns/queue.rb', line 56 def limit: 16, wait_seconds: nil request_opts = {params:{numOfMessages: limit}} request_opts[:params].merge!({waitseconds: wait_seconds}) if wait_seconds return [] unless result = Request.get(, request_opts) BatchMessage.new(self, result) end |
#create(opts = {}) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/ali_mns/queue.rb', line 24 def create opts={} response = Request.put(queue_path) do |request| = { :VisibilityTimeout => 30, :DelaySeconds => 0, :MaximumMessageSize => 65536, :MessageRetentionPeriod => 345600, :PollingWaitSeconds => 0}.merge(opts) request.content :Queue, end end |
#delete ⇒ Object
36 37 38 |
# File 'lib/ali_mns/queue.rb', line 36 def delete Request.delete(queue_path) end |
#messages_path ⇒ Object
73 74 75 |
# File 'lib/ali_mns/queue.rb', line 73 def "/queues/#{name}/messages" end |
#peek_message ⇒ Object
64 65 66 67 |
# File 'lib/ali_mns/queue.rb', line 64 def result = Request.get(, params: {peekonly: true}) Message.new(self, result) end |
#queue_path ⇒ Object
69 70 71 |
# File 'lib/ali_mns/queue.rb', line 69 def queue_path "/#{name}" end |
#receive_message(wait_seconds: nil) ⇒ Object
47 48 49 50 51 52 |
# File 'lib/ali_mns/queue.rb', line 47 def wait_seconds: nil request_opts = {} request_opts.merge!(params:{waitseconds: wait_seconds}) if wait_seconds return nil unless result = Request.get(, request_opts) Message.new(self, result) end |
#send_message(message, opts = {}) ⇒ Object
40 41 42 43 44 45 |
# File 'lib/ali_mns/queue.rb', line 40 def , opts={} Request.post() do |request| = {:DelaySeconds => 0, :Priority => 10}.merge(opts) request.content :Message, .merge(:MessageBody => .to_s) end end |