Class: Beetle::Message

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/beetle/message.rb

Overview

Instances of class Message are created when a subscription callback fires. Class Message contains the code responsible for message deduplication and determining if it should retry executing the message handler after a handler has crashed (or forcefully aborted).

Constant Summary collapse

FORMAT_VERSION =

current message format version

1
FLAG_REDUNDANT =

flag for encoding redundant messages

1
DEFAULT_TTL =

default lifetime of messages

1.day
DEFAULT_HANDLER_TIMEOUT =

forcefully abort a running handler after this many seconds. can be overriden when registering a handler.

600.seconds
TIMEOUT_GRACE_PERIOD =

How much extra time on top of the handler timeout we add before considering a handler timed out

10.seconds
DEFAULT_HANDLER_EXECUTION_ATTEMPTS =

how many times we should try to run a handler before giving up

1
DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY =

how many seconds we should wait before retrying handler execution

10.seconds
DEFAULT_EXCEPTION_LIMIT =

how many exceptions should be tolerated before giving up

0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger

Constructor Details

#initialize(queue, header, body, opts = {}) ⇒ Message

Returns a new instance of Message.



65
66
67
68
69
70
71
# File 'lib/beetle/message.rb', line 65

def initialize(queue, header, body, opts = {})
  @queue  = queue
  @header = header
  @data   = body
  setup(opts)
  decode
end

Instance Attribute Details

#attempts_limitObject (readonly)

how many times we should try to run the handler



55
56
57
# File 'lib/beetle/message.rb', line 55

def attempts_limit
  @attempts_limit
end

#dataObject (readonly)

message payload



41
42
43
# File 'lib/beetle/message.rb', line 41

def data
  @data
end

#delayObject (readonly)

how long to wait before retrying the message handler



51
52
53
# File 'lib/beetle/message.rb', line 51

def delay
  @delay
end

#exceptionObject (readonly)

exception raised by handler execution



61
62
63
# File 'lib/beetle/message.rb', line 61

def exception
  @exception
end

#exceptions_limitObject (readonly)

how many exceptions we should tolerate before giving up



57
58
59
# File 'lib/beetle/message.rb', line 57

def exceptions_limit
  @exceptions_limit
end

#expires_atObject (readonly)

unix timestamp after which the message should be considered stale



47
48
49
# File 'lib/beetle/message.rb', line 47

def expires_at
  @expires_at
end

#flagsObject (readonly)

flags sent with the message



45
46
47
# File 'lib/beetle/message.rb', line 45

def flags
  @flags
end

#format_versionObject (readonly)

the message format version of the message



43
44
45
# File 'lib/beetle/message.rb', line 43

def format_version
  @format_version
end

#handler_resultObject (readonly)

value returned by handler execution



63
64
65
# File 'lib/beetle/message.rb', line 63

def handler_result
  @handler_result
end

#headerObject (readonly)

the AMQP header received with the message



35
36
37
# File 'lib/beetle/message.rb', line 35

def header
  @header
end

#max_delayObject (readonly)

maximum wait time for message handler retries (uses exponential backoff)



53
54
55
# File 'lib/beetle/message.rb', line 53

def max_delay
  @max_delay
end

#queueObject (readonly)

name of the queue on which the message was received



33
34
35
# File 'lib/beetle/message.rb', line 33

def queue
  @queue
end

#retry_onObject (readonly)

array of exceptions accepted to be rescued and retried



59
60
61
# File 'lib/beetle/message.rb', line 59

def retry_on
  @retry_on
end

#serverObject (readonly)

server from which the message was received



31
32
33
# File 'lib/beetle/message.rb', line 31

def server
  @server
end

#timeoutObject (readonly)

how many seconds the handler is allowed to execute



49
50
51
# File 'lib/beetle/message.rb', line 49

def timeout
  @timeout
end

#timestampObject (readonly)

unix timestamp when the message was published



39
40
41
# File 'lib/beetle/message.rb', line 39

def timestamp
  @timestamp
end

#uuidObject (readonly)

the uuid of the message



37
38
39
# File 'lib/beetle/message.rb', line 37

def uuid
  @uuid
end

Class Method Details

.generate_uuidObject

generate uuid for publishing



152
153
154
# File 'lib/beetle/message.rb', line 152

def self.generate_uuid
  SecureRandom.uuid
end

.nowObject

current time (UNIX timestamp)



142
143
144
# File 'lib/beetle/message.rb', line 142

def self.now #:nodoc:
  Time.now.to_i
end

.publishing_options(opts = {}) ⇒ Object

build hash with options for the publisher



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/beetle/message.rb', line 101

def self.publishing_options(opts = {}) #:nodoc:
  flags = 0
  flags |= FLAG_REDUNDANT if opts[:redundant]
  expires_at = now + (opts[:ttl] || DEFAULT_TTL).to_i
  opts = opts.slice(*PUBLISHING_KEYS)
  opts[:message_id] = generate_uuid.to_s
  opts[:timestamp] = now
  headers = {}
  headers.merge!(opts[:headers]) if opts[:headers]
  headers.reject! {|k,v| v.nil? }
  headers.each {|k,v| headers[k] = v.to_s if v.is_a?(Symbol) }
  headers.merge!(
    :format_version => FORMAT_VERSION.to_s,
    :flags => flags.to_s,
    :expires_at => expires_at.to_s
  )
  opts[:headers] = headers
  opts
end

Instance Method Details

#aquire_mutex!Object

aquire execution mutex before we run the handler (and delete it if we can’t aquire it).



241
242
243
244
245
246
247
248
# File 'lib/beetle/message.rb', line 241

def aquire_mutex!
  if mutex = @store.setnx(msg_id, :mutex, now)
    logger.debug "Beetle: aquired mutex: #{msg_id}"
  else
    delete_mutex!
  end
  mutex
end

#attemptsObject

how many times we already tried running the handler



202
203
204
# File 'lib/beetle/message.rb', line 202

def attempts
  @store.get(msg_id, :attempts).to_i
end

#attempts_limit_reached?(attempts = nil) ⇒ Boolean

whether we have already tried running the handler as often as specified when the handler was registered

Returns:

  • (Boolean)


212
213
214
# File 'lib/beetle/message.rb', line 212

def attempts_limit_reached?(attempts = nil)
  (attempts ||= @store.get(msg_id, :attempts)) && attempts.to_i >= attempts_limit
end

#completed!Object

mark message handling complete in the deduplication store



187
188
189
# File 'lib/beetle/message.rb', line 187

def completed!
  @store.mset(msg_id, :status => "completed", :timeout => 0)
end

#completed?Boolean

message handling completed?

Returns:

  • (Boolean)


182
183
184
# File 'lib/beetle/message.rb', line 182

def completed?
  @store.get(msg_id, :status) == "completed"
end

#decodeObject

extracts various values from the AMQP header properties



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/beetle/message.rb', line 87

def decode #:nodoc:
  amqp_headers = header.attributes
  @uuid = amqp_headers[:message_id]
  @timestamp = amqp_headers[:timestamp]
  headers = amqp_headers[:headers].symbolize_keys
  @format_version = headers[:format_version].to_i
  @flags = headers[:flags].to_i
  @expires_at = headers[:expires_at].to_i
rescue Exception => @exception
  Beetle::reraise_expectation_errors!
  logger.error "Could not decode message. #{self.inspect}"
end

#delayed?(t = nil) ⇒ Boolean

whether we should wait before running the handler

Returns:

  • (Boolean)


192
193
194
# File 'lib/beetle/message.rb', line 192

def delayed?(t = nil)
  (t ||= @store.get(msg_id, :delay)) && t.to_i > now
end

#delete_mutex!Object

delete execution mutex



251
252
253
254
# File 'lib/beetle/message.rb', line 251

def delete_mutex!
  @store.del(msg_id, :mutex)
  logger.debug "Beetle: deleted mutex: #{msg_id}"
end

#exception_accepted?Boolean

Returns:

  • (Boolean)


226
227
228
# File 'lib/beetle/message.rb', line 226

def exception_accepted?
  @exception.nil? || retry_on.nil? || retry_on.any?{ |klass| @exception.is_a? klass}
end

#exceptions_limit_reached?(exceptions = nil) ⇒ Boolean

whether the number of exceptions has exceeded the limit set when the handler was registered

Returns:

  • (Boolean)


222
223
224
# File 'lib/beetle/message.rb', line 222

def exceptions_limit_reached?(exceptions = nil)
  (exceptions ||= @store.get(msg_id, :exceptions)) && exceptions.to_i > exceptions_limit
end

#expired?Boolean

a message has expired if the header expiration timestamp is smaller than the current time

Returns:

  • (Boolean)


147
148
149
# File 'lib/beetle/message.rb', line 147

def expired?
  @expires_at < now
end

#fetch_status_delay_timeout_attempts_exceptionsObject



256
257
258
# File 'lib/beetle/message.rb', line 256

def fetch_status_delay_timeout_attempts_exceptions
  @store.mget(msg_id, [:status, :delay, :timeout, :attempts, :exceptions])
end

#increment_exception_count!Object

increment number of exception occurences in the deduplication store



217
218
219
# File 'lib/beetle/message.rb', line 217

def increment_exception_count!
  @store.incr(msg_id, :exceptions)
end

#increment_execution_attempts!Object

record the fact that we are trying to run the handler



207
208
209
# File 'lib/beetle/message.rb', line 207

def increment_execution_attempts!
  @store.incr(msg_id, :attempts)
end

#key_exists?Boolean

have we already seen this message? if not, set the status to “incomplete” and store the message exipration timestamp in the deduplication store.

Returns:

  • (Boolean)


232
233
234
235
236
237
238
# File 'lib/beetle/message.rb', line 232

def key_exists?
  old_message = !@store.msetnx(msg_id, :status =>"incomplete", :expires => @expires_at.to_i, :timeout => (now + timeout).to_i)
  if old_message
    logger.debug "Beetle: received duplicate message: #{msg_id} on queue: #{@queue}"
  end
  old_message
end

#msg_idObject

unique message id. used to form various keys in the deduplication store.



132
133
134
# File 'lib/beetle/message.rb', line 132

def msg_id
  @msg_id ||= "msgid:#{queue}:#{uuid}"
end

#nowObject

current time (UNIX timestamp)



137
138
139
# File 'lib/beetle/message.rb', line 137

def now #:nodoc:
  Time.now.to_i
end

#process(handler) ⇒ Object

process this message and do not allow any exception to escape to the caller



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/beetle/message.rb', line 261

def process(handler)
  result = nil
  begin
    # pre_process might set up log routing and it might raise
    handler.pre_process(self)
  rescue Exception => @pre_exception
    Beetle::reraise_expectation_errors!
    logger.error "Beetle: preprocessing error #{@pre_exception.class}(#{@pre_exception}) for #{msg_id}"
  end
  logger.debug "Beetle: processing message #{msg_id}(#{timestamp}) redelivered: #{header.redelivered?}"
  begin
    result = process_internal(handler)
    handler.process_exception(@exception || @pre_exception) if (@exception || @pre_exception)
    handler.process_failure(result) if result.failure?
  rescue Exception => e
    Beetle::reraise_expectation_errors!
    logger.warn "Beetle: exception '#{e}' during processing of message #{msg_id}"
    logger.warn "Beetle: backtrace: #{e.backtrace.join("\n")}"
    result = RC::InternalError
  end
  result
end

#redundant?Boolean

whether the publisher has tried sending this message to two servers

Returns:

  • (Boolean)


157
158
159
# File 'lib/beetle/message.rb', line 157

def redundant?
  @flags & FLAG_REDUNDANT == FLAG_REDUNDANT
end

#routing_keyObject Also known as: key

the routing key



122
123
124
125
126
127
128
# File 'lib/beetle/message.rb', line 122

def routing_key
  @routing_key ||= if x_death = header.attributes[:headers]["x-death"]
    x_death.last["routing-keys"].first
  else
    header.routing_key
  end
end

#set_delay!Object

store delay value in the deduplication store



197
198
199
# File 'lib/beetle/message.rb', line 197

def set_delay!
  @store.set(msg_id, :delay, now + next_delay(attempts))
end

#set_timeout!Object

store handler timeout timestamp in the deduplication store



167
168
169
# File 'lib/beetle/message.rb', line 167

def set_timeout!
  @store.set(msg_id, :timeout, (now + timeout).ceil)
end

#setup(opts) ⇒ Object

:nodoc:



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/beetle/message.rb', line 73

def setup(opts) #:nodoc:
  @server           = opts[:server]
  @timeout          = opts[:timeout]    || DEFAULT_HANDLER_TIMEOUT.to_i
  @delay            = (opts[:delay]     || DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY).ceil
  @attempts_limit   = opts[:attempts]   || DEFAULT_HANDLER_EXECUTION_ATTEMPTS
  @exceptions_limit = opts[:exceptions] || DEFAULT_EXCEPTION_LIMIT
  @attempts_limit   = @exceptions_limit + 1 if @attempts_limit <= @exceptions_limit
  @retry_on         = opts[:retry_on] || nil
  @store            = opts[:store]
  max_delay         = opts[:max_delay] || @delay
  @max_delay        = max_delay.ceil if max_delay >= 2*@delay
end

#simple?Boolean

whether this is a message we can process without accessing the deduplication store

Returns:

  • (Boolean)


162
163
164
# File 'lib/beetle/message.rb', line 162

def simple?
  !redundant? && attempts_limit == 1
end

#timed_out!Object

reset handler timeout in the deduplication store



177
178
179
# File 'lib/beetle/message.rb', line 177

def timed_out!
  @store.set(msg_id, :timeout, 0)
end

#timed_out?(t = nil) ⇒ Boolean

handler timed out?

Returns:

  • (Boolean)


172
173
174
# File 'lib/beetle/message.rb', line 172

def timed_out?(t = nil)
  (t ||= @store.get(msg_id, :timeout)) && (t.to_i + TIMEOUT_GRACE_PERIOD) < now
end