Class: ActiveJob::QueueAdapters::ActiveElasticJobAdapter

Inherits:
Object
  • Object
show all
Extended by:
ActiveElasticJob::MD5MessageDigestCalculation
Defined in:
lib/active_job/queue_adapters/active_elastic_job_adapter.rb

Overview

Active Elastic Job adapter for Active Job

Active Elastic Job provides (1) an adapter (this class) for Rails’ Active Job framework and (2) a Rack middleware to process job requests, which are sent by the SQS daemon running in Amazon Elastic Beanstalk worker environments.

This adapter serializes job objects and sends them as a message to an Amazon SQS queue specified by the job’s queue name, see ActiveJob::Base.queue_as

To use Active Elastic Job, set the queue_adapter config to :active_elastic_job.

Rails.application.config.active_job.queue_adapter = :active_elastic_job

Defined Under Namespace

Classes: DelayTooLong, Error, MD5MismatchError, NonExistentQueue, SerializedJobTooBig

Constant Summary collapse

MAX_MESSAGE_SIZE =
(256 * 1024)
MAX_DELAY_IN_MINUTES =
15

Constants included from ActiveElasticJob::MD5MessageDigestCalculation

ActiveElasticJob::MD5MessageDigestCalculation::NORMALIZED_ENCODING, ActiveElasticJob::MD5MessageDigestCalculation::TRANSPORT_TYPE_ENCODINGS

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ActiveElasticJob::MD5MessageDigestCalculation

md5_of_message_attributes, md5_of_message_body

Class Method Details

.enqueue(job) ⇒ Object

:nodoc:



100
101
102
# File 'lib/active_job/queue_adapters/active_elastic_job_adapter.rb', line 100

def enqueue(job) #:nodoc:
  enqueue_at(job, Time.now)
end

.enqueue_at(job, timestamp) ⇒ Object

:nodoc:



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/active_job/queue_adapters/active_elastic_job_adapter.rb', line 104

def enqueue_at(job, timestamp) #:nodoc:
  serialized_job = JSON.dump(job.serialize)
  check_job_size!(serialized_job)
  message = build_message(job.queue_name, serialized_job, timestamp)
  resp = aws_sqs_client.send_message(message)
  unless aws_client_verifies_md5_digests?
    verify_md5_digests!(
      resp,
      message[:message_body],
      message[:message_attributes])
  end
  job.provider_job_id = resp.message_id
rescue Aws::SQS::Errors::NonExistentQueue => e
  unless @queue_urls[job.queue_name.to_s].nil?
    @queue_urls[job.queue_name.to_s] = nil
    retry
  end
  raise NonExistentQueue.new(job, aws_region)
rescue Aws::Errors::ServiceError => e
  raise Error, "Could not enqueue job, #{e.message}"
end

Instance Method Details

#enqueue(job) ⇒ Object

:nodoc:



91
92
93
# File 'lib/active_job/queue_adapters/active_elastic_job_adapter.rb', line 91

def enqueue(job) #:nodoc:
  self.class.enqueue job
end

#enqueue_at(job, timestamp) ⇒ Object

:nodoc:



95
96
97
# File 'lib/active_job/queue_adapters/active_elastic_job_adapter.rb', line 95

def enqueue_at(job, timestamp) #:nodoc:
  self.class.enqueue_at(job, timestamp)
end