Class: Brown::Message

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/brown/message.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#backtrace, #log_level, #logger

Constructor Details

#initialize(payload, metadata, requeue_queue, requeue_options, opts = {}, &blk) ⇒ Message

Returns a new instance of Message.

Raises:

  • (RuntimeError)


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/brown/message.rb', line 10

def initialize(payload, , requeue_queue, requeue_options, opts = {}, &blk)
	@metadata = 

	@requeue_queue   = requeue_queue
	@requeue_options = requeue_options

	@requeue_options[:strategy] ||= :linear

	@requeue_options[:on_requeue] ||= ->(count, total_count, cumulative_delay) {
		logger.info { "Requeuing (#{@requeue_options[:strategy]}) message on queue: #{@requeue_queue.name}, count: #{count} of #{total_count}." }
	}

	@requeue_options[:on_requeue_limit] ||= ->(message, count, total_count, cumulative_delay) {
		logger.info { "Not attempting any more requeues, requeue limit reached: #{total_count} for queue: #{@requeue_queue.name}, cummulative delay: #{cumulative_delay}s." }
	}

	klass = Brown::ACLLookup.get_by_hash(.type)
	raise RuntimeError, "Unknown ACL: #{.type}" if klass.nil?

	@payload = klass.new.parse_from_string(payload)

	blk.call(@payload, self)
	ack if opts[:auto_ack]
end

Instance Attribute Details

#metadataObject (readonly)

Returns the value of attribute metadata.



8
9
10
# File 'lib/brown/message.rb', line 8

def 
  @metadata
end

#payloadObject (readonly)

Returns the value of attribute payload.



8
9
10
# File 'lib/brown/message.rb', line 8

def payload
  @payload
end

Instance Method Details

#ack(multiple = false) ⇒ Object



35
36
37
# File 'lib/brown/message.rb', line 35

def ack(multiple = false)
	@metadata.ack(multiple)
end

#nak(opts = {}) ⇒ Object Also known as: reject



39
40
41
# File 'lib/brown/message.rb', line 39

def nak(opts = {})
	@metadata.reject(opts)
end

#requeueObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/brown/message.rb', line 45

def requeue
	if current_requeue_number < @requeue_options[:count]
		cumulative_delay = case @requeue_options[:strategy].to_sym
		when :linear
			@requeue_options[:delay] * (current_requeue_number + 1)
		when :exponential
			@requeue_options[:delay] * (2 ** current_requeue_number)
		when :exponential_no_initial_delay
			@requeue_options[:delay] * (2 ** current_requeue_number - 1)
		else
			raise RuntimeError, "Unknown requeue strategy #{@requeue_options[:strategy].to_sym.inspect}"
		end

		EM.add_timer(cumulative_delay) do
			new_headers = (@metadata.headers || {}).merge('requeue' => current_requeue_number + 1)
			@requeue_queue.publish(@payload, @metadata.to_hash.merge(:headers => new_headers))
		end

		@requeue_options[:on_requeue].call(current_requeue_number + 1, @requeue_options[:count], cumulative_delay)
	else
		@requeue_options[:on_requeue_limit].call(@payload, current_requeue_number + 1, @requeue_options[:count], @requeue_options[:delay] * current_requeue_number)
	end
end