Class: SimpleJob::SQSJobQueue

Inherits:
JobQueue
  • Object
show all
Defined in:
lib/simple_job/sqs_job_queue.rb

Overview

A SimpleJob::JobQueue implementation that uses AWS SQS

Constant Summary collapse

DEFAULT_POLL_INTERVAL =
1

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from JobQueue

[], default, queue_class, register_job_queue

Class Attribute Details

.queuesObject

Returns the value of attribute queues.



253
254
255
# File 'lib/simple_job/sqs_job_queue.rb', line 253

def queues
  @queues
end

Class Method Details

.config(options = {}) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/simple_job/sqs_job_queue.rb', line 15

def self.config(options = {})
  @config ||= {
    :queue_prefix => ENV['SIMPLE_JOB_SQS_JOB_QUEUE_PREFIX'],
    :default_visibility_timeout => 60,
    :environment => (defined?(Rails) && Rails.env) || 'development',
    :cloud_watch_namespace => nil,
  }

  @config.merge!(options) if options

  raise 'must configure :queue_prefix using SQSJobQueue.config' if !@config[:queue_prefix]

  @config
end

.default_queueObject



30
31
32
# File 'lib/simple_job/sqs_job_queue.rb', line 30

def self.default_queue
  @default_queue || super
end

.define_queue(type, options = {}) ⇒ Object

Sets up an SQS queue, using the given type as a unique identifier for the name.

A :visibility_timeout option may be passed to override the visibility timeout that is used when polling the queue.

The :asynchronous_execute option, if set to true, will cause the poll method to parse and immediately accept each message (if it’s validly formatted). It will then fork and execute the proper job in a separate process. This can be used when you have long-running jobs that will exceed the visibility timeout, and it is not critical that they be retried when they fail.

You may pass an :accept_nested_definition option with a string value to allow this queue to accept messages where the body is nested within a hash entry. This facilitates easy processing of SNS and AutoScaling messages. For example, if you pass this option:

accept_nested_definition: 'NotificationMetadata'

Then you can put a job body into the NotificationMetadata of an AutoScaling notification:

{ "AutoScalingGroupName": "some_name", "Service": "AWS Auto Scaling" ...
  "NotificationMetadata": "{\"type\":\"my_job\",\"version\":\"1\"}" }

Then the queue will attempt to process incoming messages normally, but if it encounters a message missing a type and version, it will check the value passed into accept_nested_definition before failing.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/simple_job/sqs_job_queue.rb', line 61

def self.define_queue(type, options = {})
  type = type.to_s

  options = {
    visibility_timeout: config[:default_visibility_timeout],
    asynchronous_execute: false,
    default: false
  }.merge(options)
  make_default = options.delete(:default)

  queue = self.new(type, options)
  self.queues ||= {}
  self.queues[type] = queue

  @default_queue = queue if make_default

  queue
end

.get_queue(type, options = {}) ⇒ Object



80
81
82
83
# File 'lib/simple_job/sqs_job_queue.rb', line 80

def self.get_queue(type, options = {})
  type = type.to_s
  (self.queues || {})[type] || super
end

Instance Method Details

#enqueue(message, options = {}) ⇒ Object



85
86
87
88
# File 'lib/simple_job/sqs_job_queue.rb', line 85

def enqueue(message, options = {})
  raise("enqueue expects a raw string") unless message.is_a?(String)
  sqs_queue.send_message(message, options)
end

#poll(options = {}, &block) ⇒ Object

Polls the queue, matching incoming messages with registered jobs, and executing the proper job/version.

If called without a block, it will simply call the #execute method of the matched job. A block may be passed to add custom logic, but in this case the caller is responsible for calling #execute. The block will be passed two arguments, the matching job definition (already populated with the contents of the message) and the raw AWS message.

The #execute method MAY have a parameter, which will be populated with the raw AWS::SQS::ReceivedMessage object if it exists.

The queue’s configured visibility timeout will be used unless the :visibility_timeout option is passed (as a number of seconds).

By default, the message’s ‘sent_at’, ‘receive_count’, and ‘first_received_at’ attributes will be populated in the AWS message, but this may be overridden by passing an array of symbols to the :attributes option.

By default, errors during job execution or message polling will be logged and the polling will continue, but that behavior may be changed by setting the :raise_exceptions option to true.

By defult, this method will poll indefinitely. If you pass an :idle_timeout option, the polling will stop and this method will return if that number of seconds passes without receiving a message. In both cases, the method will safely complete processing the current message and return if a HUP, INT, or TERM signal is sent to the process.

You may also pass a :max_executions option (as an integer), in which case the poll method will poll that many times and then exit.

If poll_interval is set, polling will pause for poll_interval seconds when there are no available messages. If always_sleep is set to true, then polling will pause after every message is received, even if there are more available messages.

Note that this method will override any signal handlers for the HUP, INT, or TERM signals during its execution, but the previous handlers will be restored once the method returns.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/simple_job/sqs_job_queue.rb', line 130

def poll(options = {}, &block)
  options = {
    :visibility_timeout => visibility_timeout,
    :attributes => [ :sent_at, :receive_count, :first_received_at ],
    :raise_exceptions => false,
    :idle_timeout => nil,
    :poll_interval => DEFAULT_POLL_INTERVAL,
    :max_executions => nil,
    :always_sleep => false
  }.merge(options)

  message_handler = block || lambda do |definition, message|
    execute_method = definition.method(:execute)
    arguments = []
    if execute_method.arity >= 1
      arguments << message
    end
    execute_method.call(*arguments)
  end

  exit_next = false

  logger.debug 'trapping terminate signals with function to exit loop'
  signal_exit = lambda do |*args|
    logger.info "caught signal to shutdown; finishing current message and quitting..."
    exit_next = true
  end
  previous_traps = {}
  ['HUP', 'INT', 'TERM'].each do |signal|
    previous_traps[signal] = Signal.trap(signal, signal_exit)
  end

  last_message_at = Time.now

  max_executions = options[:max_executions]
  loop do
    break if max_executions && (max_executions <= 0)
    last_message = nil
    last_definition = nil
    current_start_milliseconds = get_milliseconds
    current_job_type = 'unknown'
    begin
      sqs_queue.receive_messages(options) do |message|
        message_body = get_message_body(message)
        raw_message = JSON.parse(message_body)

        if raw_message['type'] && raw_message['version']
          last_message = message
          last_message_at = Time.now
          current_job_type = raw_message['type']
          definition_class = JobDefinition.job_definition_class_for(raw_message['type'], raw_message['version'])

          raise('no definition found') if !definition_class

          if definition_class.max_attempt_count && (message.receive_count > definition_class.max_attempt_count)
            raise('max attempt count reached')
          end

          definition = definition_class.new.from_json(message_body)
          last_definition = definition

          # NOTE: only executes if asynchronous_execute is false (message will be re-enqueued after
          # vis. timeout if this fails or runs too long)
          message_handler.call(definition, message) unless asynchronous_execute
        else
          logger.info("ignoring invalid message: #{message_body}")
        end
      end

      # NOTE: only executes if asynchronous_execute is set (after message has been confirmed)
      if asynchronous_execute && last_message
        pid = fork
        if pid
          # in parent
          Process.detach pid
        else
          # in child
          begin
            message_handler.call(last_definition, last_message)
            log_execution(true, last_message, current_job_type, current_start_milliseconds)
          rescue Exception => e
            logger.error("error executing asynchronous job: #{e.message}")
            logger.error e.backtrace.join("\n  ")
          end
          exit
        end
      else
        log_execution(true, last_message, current_job_type, current_start_milliseconds)
      end

      break if options[:idle_timeout] && ((Time.now - last_message_at) > options[:idle_timeout])

      if options[:always_sleep] || !last_message
        Kernel.sleep(options[:poll_interval]) unless options[:poll_interval] == 0
      end
    rescue SystemExit => e
      raise e
    rescue Exception => e
      log_execution(false, last_message, current_job_type, current_start_milliseconds) rescue nil

      if options[:raise_exceptions]
        raise e
      else
        logger.error("unable to process message: #{e.message}")
        logger.error("message body: #{last_message && last_message.body}")
        logger.error(e.backtrace.join("\n  "))
      end
    end
    max_executions -= 1 if max_executions
    break if exit_next
  end

  logger.debug 'restoring previous signal traps'
  previous_traps.each do |signal, command|
    Signal.trap(signal, command)
  end

  logger.info "shutdown successful"
end