Class: AliMns::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/ali_mns/queue.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Queue

Returns a new instance of Queue.



20
21
22
# File 'lib/ali_mns/queue.rb', line 20

def initialize name
  @name = name
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



3
4
5
# File 'lib/ali_mns/queue.rb', line 3

def name
  @name
end

Class Method Details

.[](name) ⇒ Object



8
9
10
# File 'lib/ali_mns/queue.rb', line 8

def [] name
  Queue.new(name)
end

.queues(opts = {}) ⇒ Object



12
13
14
15
16
17
# File 'lib/ali_mns/queue.rb', line 12

def queues opts={}
  mns_options = {query: "x-mns-prefix", offset: "x-mns-marker", size: "x-mns-ret-number"}
  mns_headers = opts.slice(*mns_options.keys).reduce({}){|mns_headers, item| k, v = *item; mns_headers.merge!(mns_options[k]=>v)}
  response = Request.get("/", mns_headers: mns_headers)
  Hash.xml_array(response, "Queues", "Queue").collect{|item| Queue.new(URI(item["QueueURL"]).path.sub!(/^\//, ""))}
end

Instance Method Details

#batch_receive_message(limit: 16, wait_seconds: nil) ⇒ Object

批量获取消息本接口用于消费者批量消费队列的消息,一次BatchReceiveMessage操作最多可以获取16条消息。该操作会将取得的消息状态变成Inactive,Inactive的时间长度由Queue属性VisibilityTimeout指定(详见CreateQueue接口)。 消费者在VisibilityTimeout时间内消费成功后需要调用DeleteMessage接口删除取得的消息,否则取得的消息将会被重新置为Active,又可被消费者重新消费。



56
57
58
59
60
61
62
# File 'lib/ali_mns/queue.rb', line 56

def batch_receive_message limit: 16, wait_seconds: nil
  request_opts = {params:{numOfMessages: limit}}
  request_opts[:params].merge!({waitseconds: wait_seconds}) if wait_seconds
  
  return [] unless result = Request.get(messages_path, request_opts)
  BatchMessage.new(self, result)
end

#create(opts = {}) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/ali_mns/queue.rb', line 24

def create opts={}
  response = Request.put(queue_path) do |request|
    msg_options = {
      :VisibilityTimeout => 30,
      :DelaySeconds => 0,
      :MaximumMessageSize => 65536,
      :MessageRetentionPeriod => 345600,
      :PollingWaitSeconds => 0}.merge(opts)
    request.content :Queue, msg_options
  end
end

#deleteObject



36
37
38
# File 'lib/ali_mns/queue.rb', line 36

def delete
  Request.delete(queue_path)
end

#messages_pathObject



73
74
75
# File 'lib/ali_mns/queue.rb', line 73

def messages_path
  "/queues/#{name}/messages"
end

#peek_messageObject



64
65
66
67
# File 'lib/ali_mns/queue.rb', line 64

def peek_message
  result = Request.get(messages_path, params: {peekonly: true})
  Message.new(self, result)
end

#queue_pathObject



69
70
71
# File 'lib/ali_mns/queue.rb', line 69

def queue_path
  "/#{name}"
end

#receive_message(wait_seconds: nil) ⇒ Object



47
48
49
50
51
52
# File 'lib/ali_mns/queue.rb', line 47

def receive_message wait_seconds: nil
  request_opts = {}
  request_opts.merge!(params:{waitseconds: wait_seconds}) if wait_seconds
  return nil unless result = Request.get(messages_path, request_opts)
  Message.new(self, result)
end

#send_message(message, opts = {}) ⇒ Object



40
41
42
43
44
45
# File 'lib/ali_mns/queue.rb', line 40

def send_message message, opts={}
  Request.post(messages_path) do |request|
    msg_options = {:DelaySeconds => 0, :Priority => 10}.merge(opts)
    request.content :Message, msg_options.merge(:MessageBody => message.to_s)
  end
end