Class: Brown::Message
Instance Attribute Summary collapse
-
#metadata ⇒ Object
readonly
Returns the value of attribute metadata.
-
#payload ⇒ Object
readonly
Returns the value of attribute payload.
Instance Method Summary collapse
- #ack(multiple = false) ⇒ Object
-
#initialize(payload, metadata, requeue_queue, requeue_options, opts = {}, &blk) ⇒ Message
constructor
A new instance of Message.
- #nak(opts = {}) ⇒ Object (also: #reject)
- #requeue ⇒ Object
Methods included from Logger
#backtrace, #log_level, #logger
Constructor Details
#initialize(payload, metadata, requeue_queue, requeue_options, opts = {}, &blk) ⇒ Message
Returns a new instance of Message.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/brown/message.rb', line 10 def initialize(payload, , requeue_queue, , opts = {}, &blk) @metadata = @requeue_queue = requeue_queue @requeue_options = @requeue_options[:strategy] ||= :linear @requeue_options[:on_requeue] ||= ->(count, total_count, cumulative_delay) { logger.info { "Requeuing (#{@requeue_options[:strategy]}) message on queue: #{@requeue_queue.name}, count: #{count} of #{total_count}." } } @requeue_options[:on_requeue_limit] ||= ->(, count, total_count, cumulative_delay) { logger.info { "Not attempting any more requeues, requeue limit reached: #{total_count} for queue: #{@requeue_queue.name}, cummulative delay: #{cumulative_delay}s." } } klass = Brown::ACLLookup.get_by_hash(.type) raise RuntimeError, "Unknown ACL: #{.type}" if klass.nil? @payload = klass.new.parse_from_string(payload) blk.call(@payload, self) ack if opts[:auto_ack] end |
Instance Attribute Details
#metadata ⇒ Object (readonly)
Returns the value of attribute metadata.
8 9 10 |
# File 'lib/brown/message.rb', line 8 def @metadata end |
#payload ⇒ Object (readonly)
Returns the value of attribute payload.
8 9 10 |
# File 'lib/brown/message.rb', line 8 def payload @payload end |
Instance Method Details
#ack(multiple = false) ⇒ Object
35 36 37 |
# File 'lib/brown/message.rb', line 35 def ack(multiple = false) @metadata.ack(multiple) end |
#nak(opts = {}) ⇒ Object Also known as: reject
39 40 41 |
# File 'lib/brown/message.rb', line 39 def nak(opts = {}) @metadata.reject(opts) end |
#requeue ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/brown/message.rb', line 45 def requeue if current_requeue_number < @requeue_options[:count] cumulative_delay = case @requeue_options[:strategy].to_sym when :linear @requeue_options[:delay] * (current_requeue_number + 1) when :exponential @requeue_options[:delay] * (2 ** current_requeue_number) when :exponential_no_initial_delay @requeue_options[:delay] * (2 ** current_requeue_number - 1) else raise RuntimeError, "Unknown requeue strategy #{@requeue_options[:strategy].to_sym.inspect}" end EM.add_timer(cumulative_delay) do new_headers = (@metadata.headers || {}).merge('requeue' => current_requeue_number + 1) @requeue_queue.publish(@payload, @metadata.to_hash.merge(:headers => new_headers)) end @requeue_options[:on_requeue].call(current_requeue_number + 1, @requeue_options[:count], cumulative_delay) else @requeue_options[:on_requeue_limit].call(@payload, current_requeue_number + 1, @requeue_options[:count], @requeue_options[:delay] * current_requeue_number) end end |