Class: IronMQ::Queue

Inherits:
ResponseBase show all
Defined in:
lib/iron_mq/queues.rb

Instance Attribute Summary collapse

Attributes inherited from ResponseBase

#code

Instance Method Summary collapse

Methods inherited from ResponseBase

#[], #msg

Constructor Details

#initialize(client, queue_name) ⇒ Queue

Returns a new instance of Queue.



8
9
10
11
# File 'lib/iron_mq/queues.rb', line 8

def initialize(client, queue_name)
  @client = client
  @name = queue_name
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



6
7
8
# File 'lib/iron_mq/queues.rb', line 6

def name
  @name
end

#rawObject (readonly)

Returns the value of attribute raw.



6
7
8
# File 'lib/iron_mq/queues.rb', line 6

def raw
  @raw
end

Instance Method Details

#add_alert(alert) ⇒ Object



125
126
127
# File 'lib/iron_mq/queues.rb', line 125

def add_alert(alert)
  add_alerts([alert])
end

#add_alerts(alerts) ⇒ Object



129
130
131
# File 'lib/iron_mq/queues.rb', line 129

def add_alerts(alerts)
  call_api_and_parse_response(:post, '/alerts', :alerts => alerts)
end

#add_subscriber(subscriber, options = {}) ⇒ Object

‘options` for backward compatibility



99
100
101
# File 'lib/iron_mq/queues.rb', line 99

def add_subscriber(subscriber, options = {})
  add_subscribers([subscriber])
end

#add_subscribers(subscribers) ⇒ Object



94
95
96
# File 'lib/iron_mq/queues.rb', line 94

def add_subscribers(subscribers)
  call_api_and_parse_response(:post, "/subscribers", :subscribers => subscribers)
end

#alertsObject



141
142
143
144
145
# File 'lib/iron_mq/queues.rb', line 141

def alerts
  load
  return nil unless @raw['alerts']
  to_alerts(@raw['alerts'])
end

#call_api_and_parse_response(meth, ext_path = "", options = {}, instantiate = true, ignore404 = false) ⇒ Object



236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/iron_mq/queues.rb', line 236

def call_api_and_parse_response(meth, ext_path = "", options = {}, instantiate = true, ignore404 = false)
  r = nil
  response = if meth.to_s == "delete"
               headers = options.delete(:headers) || options.delete("headers") || {}

               @client.parse_response(@client.send(meth, "#{path(ext_path)}", options, headers))
             else
               @client.parse_response(@client.send(meth, "#{path(ext_path)}", options))
             end
  r = instantiate ? ResponseBase.new(response) : response
  r
end

#clearObject Also known as: clear_queue



62
63
64
# File 'lib/iron_mq/queues.rb', line 62

def clear
  call_api_and_parse_response(:post, "/clear", {}, false, true)
end

#delete(message_id, options = {}) ⇒ Object

Backward compatibility



84
85
86
87
# File 'lib/iron_mq/queues.rb', line 84

def delete(message_id, options = {})
  # API does not accept any options
  Message.new(self, {"id" => message_id}).delete
end

#delete_messages(ids) ⇒ Object

Accepts an array of message ids



90
91
92
# File 'lib/iron_mq/queues.rb', line 90

def delete_messages(ids)
  call_api_and_parse_response(:delete, "/messages", :ids => ids)
end

#delete_queueObject

Backward compatibility, better name is ‘delete`



69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/iron_mq/queues.rb', line 69

def delete_queue
  r = call_api_and_parse_response(:delete)
  @raw = nil
  return r
rescue Rest::HttpError => ex
  #if ex.code == 404
  #  Rest.logger.info("Delete got 404, safe to ignore.")
  #  # return ResponseBase as normal
  #  ResponseBase.new({"msg" => "Deleted"}, 404)
  #else
    raise ex
  #end
end

#get_message(id) ⇒ Object



206
207
208
209
# File 'lib/iron_mq/queues.rb', line 206

def get_message(id)
  resp = call_api_and_parse_response(:get, "/messages/#{id}", {}, false)
  Message.new(self, resp)
end

#get_messages(options = {}) ⇒ Object Also known as: get



188
189
190
191
192
193
194
195
196
197
# File 'lib/iron_mq/queues.rb', line 188

def get_messages(options = {})
  if options.is_a?(String)
    # assume it's an id
    return Message.new(self, {"id" => options})
  end

  resp = call_api_and_parse_response(:get, "/messages", options, false)

  process_messages(resp["messages"], options)
end

#idObject



29
30
31
32
# File 'lib/iron_mq/queues.rb', line 29

def id
  load
  @raw['id']
end

#infoObject



13
14
15
# File 'lib/iron_mq/queues.rb', line 13

def info
  load
end

#loadObject

this is only run once if it hasn’t been called before unless force is true, then it will force reload.



18
19
20
21
22
# File 'lib/iron_mq/queues.rb', line 18

def load
  reload if @raw.nil?

  @raw
end

#messagesObject

Backward compatibility



202
203
204
# File 'lib/iron_mq/queues.rb', line 202

def messages
  self
end

#peek_messages(options = {}) ⇒ Object Also known as: peek



211
212
213
214
215
# File 'lib/iron_mq/queues.rb', line 211

def peek_messages(options = {})
  resp = call_api_and_parse_response(:get, "/messages/peek", options)

  process_messages(resp["messages"], options)
end

#poll_messages(options = {}, &block) ⇒ Object Also known as: poll



219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/iron_mq/queues.rb', line 219

def poll_messages(options = {}, &block)
  sleep_duration = options[:sleep_duration] || 1

  while true
    msg = get_messages(options.merge(:n => 1))
    if msg.nil?
      options[:break_if_nil] ? break : sleep(sleep_duration)
    else
      yield msg
      # Delete message after processing
      msg.delete
    end
  end
end

#post_messages(payload, options = {}) ⇒ Object Also known as: post



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/iron_mq/queues.rb', line 147

def post_messages(payload, options = {})
  batch = false

  instantiate = [options.delete(:instantiate),
                 options.delete('instantiate')].compact.first

  msgs = if payload.is_a?(Array)
           batch = true
           # FIXME: This maybe better to process Array of Objects the same way as for single message.
           #
           #          payload.map { |msg| options.merge(:body => msg) }
           #
           #        For now user must pass objects like `[{:body => msg1}, {:body => msg2}]`
           payload.map { |msg| msg.merge(options) }
         else
           [options.merge(:body => payload)]
         end

  # Do not instantiate response
  res = call_api_and_parse_response(:post, "/messages", {:messages => msgs}, false)

  if instantiate
    n = batch ? 2 : 1
    msg_ids = res["ids"].map { |id| {"id" => id} }

    process_messages(msg_ids, {:n => n})
  else
    if batch
      # FIXME: Return Array of ResponseBase instead, it seems more clear than raw response
      #
      #          res["ids"].map { |id| ResponseBase.new({"id" => id, "msg" => res["msg"]}) }
      #
      ResponseBase.new(res) # Backward capable
    else
      ResponseBase.new({"id" => res["ids"][0], "msg" => res["msg"]})
    end
  end
end

#push_queue?Boolean

Returns:

  • (Boolean)


49
50
51
52
53
54
# File 'lib/iron_mq/queues.rb', line 49

def push_queue?
  # FIXME: `push_type` parameter in not guaranted it's push queue.
  #        When the parameter absent it is not guaranted that queue is not push queue.
  ptype = push_type
  not (ptype.nil? || ptype.empty?)
end

#push_typeObject



44
45
46
47
# File 'lib/iron_mq/queues.rb', line 44

def push_type
  load
  @raw['push_type']
end

#reloadObject



24
25
26
27
# File 'lib/iron_mq/queues.rb', line 24

def reload
  @raw = call_api_and_parse_response(:get, "", {}, false, true)
  self
end

#remove_alert(alert) ⇒ Object



137
138
139
# File 'lib/iron_mq/queues.rb', line 137

def remove_alert(alert)
  remove_alerts([alert])
end

#remove_alerts(alerts) ⇒ Object



133
134
135
# File 'lib/iron_mq/queues.rb', line 133

def remove_alerts(alerts)
  call_api_and_parse_response(:delete, '/alerts', :alerts => alerts)
end

#remove_subscriber(subscriber) ⇒ Object



112
113
114
# File 'lib/iron_mq/queues.rb', line 112

def remove_subscriber(subscriber)
  remove_subscribers([subscriber])
end

#remove_subscribers(subscribers) ⇒ Object



103
104
105
106
107
108
109
110
# File 'lib/iron_mq/queues.rb', line 103

def remove_subscribers(subscribers)
  call_api_and_parse_response(:delete,
                              "/subscribers",
                              {
                                  :subscribers => subscribers,
                                  :headers => {"Content-Type" => @client.content_type}
                              })
end

#sizeObject



34
35
36
37
# File 'lib/iron_mq/queues.rb', line 34

def size
  load
  @raw['size'].to_i
end

#subscribers(options = {}) ⇒ Object

‘options` was kept for backward compatibility



117
118
119
120
121
122
123
# File 'lib/iron_mq/queues.rb', line 117

def subscribers(options = {})
  load
  if @raw['subscribers']
    return @raw['subscribers'].map { |s| Subscriber.new(s, self, options) }
  end
  []
end

#total_messagesObject



39
40
41
42
# File 'lib/iron_mq/queues.rb', line 39

def total_messages
  load
  @raw['total_messages'].to_i
end

#update(options) ⇒ Object Also known as: update_queue



56
57
58
# File 'lib/iron_mq/queues.rb', line 56

def update(options)
  call_api_and_parse_response(:post, "", options)
end