Class: IronMQ::Queue
- Inherits:
-
ResponseBase
- Object
- ResponseBase
- IronMQ::Queue
- Defined in:
- lib/iron_mq/queues.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#raw ⇒ Object
readonly
Returns the value of attribute raw.
Attributes inherited from ResponseBase
Instance Method Summary collapse
- #add_alert(alert) ⇒ Object
- #add_alerts(alerts) ⇒ Object
-
#add_subscriber(subscriber, options = {}) ⇒ Object
‘options` for backward compatibility.
- #add_subscribers(subscribers) ⇒ Object
- #alerts ⇒ Object
- #call_api_and_parse_response(meth, ext_path = "", options = {}, instantiate = true, ignore404 = false) ⇒ Object
- #clear ⇒ Object (also: #clear_queue)
-
#delete(message_id, options = {}) ⇒ Object
Backward compatibility.
-
#delete_messages(ids) ⇒ Object
Accepts an array of message ids.
-
#delete_queue ⇒ Object
Backward compatibility, better name is ‘delete`.
- #get_message(id) ⇒ Object
- #get_messages(options = {}) ⇒ Object (also: #get)
- #id ⇒ Object
- #info ⇒ Object
-
#initialize(client, queue_name) ⇒ Queue
constructor
A new instance of Queue.
-
#load ⇒ Object
this is only run once if it hasn’t been called before unless force is true, then it will force reload.
-
#messages ⇒ Object
Backward compatibility.
- #peek_messages(options = {}) ⇒ Object (also: #peek)
- #poll_messages(options = {}, &block) ⇒ Object (also: #poll)
- #post_messages(payload, options = {}) ⇒ Object (also: #post)
- #push_queue? ⇒ Boolean
- #push_type ⇒ Object
- #reload ⇒ Object
- #remove_alert(alert) ⇒ Object
- #remove_alerts(alerts) ⇒ Object
- #remove_subscriber(subscriber) ⇒ Object
- #remove_subscribers(subscribers) ⇒ Object
- #size ⇒ Object
-
#subscribers(options = {}) ⇒ Object
‘options` was kept for backward compatibility.
- #total_messages ⇒ Object
- #update(options) ⇒ Object (also: #update_queue)
Methods inherited from ResponseBase
Constructor Details
#initialize(client, queue_name) ⇒ Queue
Returns a new instance of Queue.
8 9 10 11 |
# File 'lib/iron_mq/queues.rb', line 8 def initialize(client, queue_name) @client = client @name = queue_name end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
6 7 8 |
# File 'lib/iron_mq/queues.rb', line 6 def name @name end |
#raw ⇒ Object (readonly)
Returns the value of attribute raw.
6 7 8 |
# File 'lib/iron_mq/queues.rb', line 6 def raw @raw end |
Instance Method Details
#add_alert(alert) ⇒ Object
125 126 127 |
# File 'lib/iron_mq/queues.rb', line 125 def add_alert(alert) add_alerts([alert]) end |
#add_alerts(alerts) ⇒ Object
129 130 131 |
# File 'lib/iron_mq/queues.rb', line 129 def add_alerts(alerts) call_api_and_parse_response(:post, '/alerts', :alerts => alerts) end |
#add_subscriber(subscriber, options = {}) ⇒ Object
‘options` for backward compatibility
99 100 101 |
# File 'lib/iron_mq/queues.rb', line 99 def add_subscriber(subscriber, = {}) add_subscribers([subscriber]) end |
#add_subscribers(subscribers) ⇒ Object
94 95 96 |
# File 'lib/iron_mq/queues.rb', line 94 def add_subscribers(subscribers) call_api_and_parse_response(:post, "/subscribers", :subscribers => subscribers) end |
#alerts ⇒ Object
141 142 143 144 145 |
# File 'lib/iron_mq/queues.rb', line 141 def alerts load return nil unless @raw['alerts'] to_alerts(@raw['alerts']) end |
#call_api_and_parse_response(meth, ext_path = "", options = {}, instantiate = true, ignore404 = false) ⇒ Object
236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/iron_mq/queues.rb', line 236 def call_api_and_parse_response(meth, ext_path = "", = {}, instantiate = true, ignore404 = false) r = nil response = if meth.to_s == "delete" headers = .delete(:headers) || .delete("headers") || {} @client.parse_response(@client.send(meth, "#{path(ext_path)}", , headers)) else @client.parse_response(@client.send(meth, "#{path(ext_path)}", )) end r = instantiate ? ResponseBase.new(response) : response r end |
#clear ⇒ Object Also known as: clear_queue
62 63 64 |
# File 'lib/iron_mq/queues.rb', line 62 def clear call_api_and_parse_response(:post, "/clear", {}, false, true) end |
#delete(message_id, options = {}) ⇒ Object
Backward compatibility
84 85 86 87 |
# File 'lib/iron_mq/queues.rb', line 84 def delete(, = {}) # API does not accept any options Message.new(self, {"id" => }).delete end |
#delete_messages(ids) ⇒ Object
Accepts an array of message ids
90 91 92 |
# File 'lib/iron_mq/queues.rb', line 90 def (ids) call_api_and_parse_response(:delete, "/messages", :ids => ids) end |
#delete_queue ⇒ Object
Backward compatibility, better name is ‘delete`
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/iron_mq/queues.rb', line 69 def delete_queue r = call_api_and_parse_response(:delete) @raw = nil return r rescue Rest::HttpError => ex #if ex.code == 404 # Rest.logger.info("Delete got 404, safe to ignore.") # # return ResponseBase as normal # ResponseBase.new({"msg" => "Deleted"}, 404) #else raise ex #end end |
#get_message(id) ⇒ Object
206 207 208 209 |
# File 'lib/iron_mq/queues.rb', line 206 def (id) resp = call_api_and_parse_response(:get, "/messages/#{id}", {}, false) Message.new(self, resp) end |
#get_messages(options = {}) ⇒ Object Also known as: get
188 189 190 191 192 193 194 195 196 197 |
# File 'lib/iron_mq/queues.rb', line 188 def ( = {}) if .is_a?(String) # assume it's an id return Message.new(self, {"id" => }) end resp = call_api_and_parse_response(:get, "/messages", , false) (resp["messages"], ) end |
#id ⇒ Object
29 30 31 32 |
# File 'lib/iron_mq/queues.rb', line 29 def id load @raw['id'] end |
#info ⇒ Object
13 14 15 |
# File 'lib/iron_mq/queues.rb', line 13 def info load end |
#load ⇒ Object
this is only run once if it hasn’t been called before unless force is true, then it will force reload.
18 19 20 21 22 |
# File 'lib/iron_mq/queues.rb', line 18 def load reload if @raw.nil? @raw end |
#messages ⇒ Object
Backward compatibility
202 203 204 |
# File 'lib/iron_mq/queues.rb', line 202 def self end |
#peek_messages(options = {}) ⇒ Object Also known as: peek
211 212 213 214 215 |
# File 'lib/iron_mq/queues.rb', line 211 def ( = {}) resp = call_api_and_parse_response(:get, "/messages/peek", ) (resp["messages"], ) end |
#poll_messages(options = {}, &block) ⇒ Object Also known as: poll
219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/iron_mq/queues.rb', line 219 def ( = {}, &block) sleep_duration = [:sleep_duration] || 1 while true msg = (.merge(:n => 1)) if msg.nil? [:break_if_nil] ? break : sleep(sleep_duration) else yield msg # Delete message after processing msg.delete end end end |
#post_messages(payload, options = {}) ⇒ Object Also known as: post
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/iron_mq/queues.rb', line 147 def (payload, = {}) batch = false instantiate = [.delete(:instantiate), .delete('instantiate')].compact.first msgs = if payload.is_a?(Array) batch = true # FIXME: This maybe better to process Array of Objects the same way as for single message. # # payload.map { |msg| options.merge(:body => msg) } # # For now user must pass objects like `[{:body => msg1}, {:body => msg2}]` payload.map { |msg| msg.merge() } else [.merge(:body => payload)] end # Do not instantiate response res = call_api_and_parse_response(:post, "/messages", {:messages => msgs}, false) if instantiate n = batch ? 2 : 1 msg_ids = res["ids"].map { |id| {"id" => id} } (msg_ids, {:n => n}) else if batch # FIXME: Return Array of ResponseBase instead, it seems more clear than raw response # # res["ids"].map { |id| ResponseBase.new({"id" => id, "msg" => res["msg"]}) } # ResponseBase.new(res) # Backward capable else ResponseBase.new({"id" => res["ids"][0], "msg" => res["msg"]}) end end end |
#push_queue? ⇒ Boolean
49 50 51 52 53 54 |
# File 'lib/iron_mq/queues.rb', line 49 def push_queue? # FIXME: `push_type` parameter in not guaranted it's push queue. # When the parameter absent it is not guaranted that queue is not push queue. ptype = push_type not (ptype.nil? || ptype.empty?) end |
#push_type ⇒ Object
44 45 46 47 |
# File 'lib/iron_mq/queues.rb', line 44 def push_type load @raw['push_type'] end |
#reload ⇒ Object
24 25 26 27 |
# File 'lib/iron_mq/queues.rb', line 24 def reload @raw = call_api_and_parse_response(:get, "", {}, false, true) self end |
#remove_alert(alert) ⇒ Object
137 138 139 |
# File 'lib/iron_mq/queues.rb', line 137 def remove_alert(alert) remove_alerts([alert]) end |
#remove_alerts(alerts) ⇒ Object
133 134 135 |
# File 'lib/iron_mq/queues.rb', line 133 def remove_alerts(alerts) call_api_and_parse_response(:delete, '/alerts', :alerts => alerts) end |
#remove_subscriber(subscriber) ⇒ Object
112 113 114 |
# File 'lib/iron_mq/queues.rb', line 112 def remove_subscriber(subscriber) remove_subscribers([subscriber]) end |
#remove_subscribers(subscribers) ⇒ Object
103 104 105 106 107 108 109 110 |
# File 'lib/iron_mq/queues.rb', line 103 def remove_subscribers(subscribers) call_api_and_parse_response(:delete, "/subscribers", { :subscribers => subscribers, :headers => {"Content-Type" => @client.content_type} }) end |
#size ⇒ Object
34 35 36 37 |
# File 'lib/iron_mq/queues.rb', line 34 def size load @raw['size'].to_i end |
#subscribers(options = {}) ⇒ Object
‘options` was kept for backward compatibility
117 118 119 120 121 122 123 |
# File 'lib/iron_mq/queues.rb', line 117 def subscribers( = {}) load if @raw['subscribers'] return @raw['subscribers'].map { |s| Subscriber.new(s, self, ) } end [] end |
#total_messages ⇒ Object
39 40 41 42 |
# File 'lib/iron_mq/queues.rb', line 39 def load @raw['total_messages'].to_i end |
#update(options) ⇒ Object Also known as: update_queue
56 57 58 |
# File 'lib/iron_mq/queues.rb', line 56 def update() call_api_and_parse_response(:post, "", ) end |