Class: Quebert::Job
Direct Known Subclasses
AsyncSender::ActiveRecord::RecordJob, AsyncSender::Instance::InstanceJob, AsyncSender::Object::ObjectJob
Defined Under Namespace
Classes: Priority
Constant Summary collapse
- DEFAULT_JOB_DELAY =
Delay a job for 0 seconds on the jobqueue
0- DEFAULT_JOB_TTR =
By default, the job should live for 10 seconds tops.
10- QUEBERT_TTR_BUFFER =
A buffer time in seconds added to the Beanstalk TTR for Quebert to do its own job cleanup The job will perform based on the Beanstalk TTR, but Beanstalk hangs on to the job just a little longer so that Quebert can bury the job or schedule a retry with the appropriate delay
5- NotImplemented =
Exceptions are used for signaling job status… ewww. Yank this out and replace with a more well thought out controller.
Class.new(StandardError)
- Action =
Class.new(Exception)
- Bury =
Class.new(Action)
- Delete =
Class.new(Action)
- Release =
Class.new(Action)
- Timeout =
Class.new(Action)
- Retry =
Class.new(Action)
Instance Attribute Summary collapse
-
#args ⇒ Object
readonly
Returns the value of attribute args.
-
#delay ⇒ Object
Returns the value of attribute delay.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#ttr ⇒ Object
Returns the value of attribute ttr.
Class Method Summary collapse
- .backend ⇒ Object
- .backend=(backend) ⇒ Object
-
.from_json(json) ⇒ Object
Read a JSON string and convert into a hash that Ruby can deal with.
Instance Method Summary collapse
-
#enqueue(opts = {}) ⇒ Object
Accepts arguments that override the job options and enqueu this stuff.
-
#initialize(*args) {|_self| ... } ⇒ Job
constructor
A new instance of Job.
- #perform(*args) ⇒ Object
-
#perform! ⇒ Object
Runs the perform method that somebody else should be implementing.
-
#to_json ⇒ Object
Serialize the job into a JSON string that we can put on the beandstalkd queue.
Constructor Details
#initialize(*args) {|_self| ... } ⇒ Job
Returns a new instance of Job.
39 40 41 42 43 44 45 46 |
# File 'lib/quebert/job.rb', line 39 def initialize(*args) @priority = Job::Priority::MEDIUM @delay = DEFAULT_JOB_DELAY @ttr = DEFAULT_JOB_TTR @args = args.dup.freeze yield self if block_given? self end |
Instance Attribute Details
#args ⇒ Object (readonly)
Returns the value of attribute args.
8 9 10 |
# File 'lib/quebert/job.rb', line 8 def args @args end |
#delay ⇒ Object
Returns the value of attribute delay.
9 10 11 |
# File 'lib/quebert/job.rb', line 9 def delay @delay end |
#priority ⇒ Object
Returns the value of attribute priority.
9 10 11 |
# File 'lib/quebert/job.rb', line 9 def priority @priority end |
#ttr ⇒ Object
Returns the value of attribute ttr.
9 10 11 |
# File 'lib/quebert/job.rb', line 9 def ttr @ttr end |
Class Method Details
.backend ⇒ Object
90 91 92 |
# File 'lib/quebert/job.rb', line 90 def self.backend @backend || Quebert.configuration.backend end |
.backend=(backend) ⇒ Object
86 87 88 |
# File 'lib/quebert/job.rb', line 86 def self.backend=(backend) @backend = backend end |
.from_json(json) ⇒ Object
Read a JSON string and convert into a hash that Ruby can deal with.
80 81 82 83 84 |
# File 'lib/quebert/job.rb', line 80 def self.from_json(json) if hash = JSON.parse(json) and not hash.empty? Serializer::Job.deserialize(hash) end end |
Instance Method Details
#enqueue(opts = {}) ⇒ Object
Accepts arguments that override the job options and enqueu this stuff.
69 70 71 72 |
# File 'lib/quebert/job.rb', line 69 def enqueue(opts={}) opts.each { |opt, val| self.send("#{opt}=", val) } self.class.backend.put self, @priority, @delay, @ttr + QUEBERT_TTR_BUFFER end |
#perform(*args) ⇒ Object
48 49 50 |
# File 'lib/quebert/job.rb', line 48 def perform(*args) raise NotImplemented end |
#perform! ⇒ Object
Runs the perform method that somebody else should be implementing
53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/quebert/job.rb', line 53 def perform! Quebert.config.before_job(self) Quebert.config.around_job(self) # Honor the timeout and kill the job in ruby-space. Beanstalk # should be cleaning up this job and returning it to the queue # as well. val = ::Timeout.timeout(@ttr, Job::Timeout){ perform(*args) } Quebert.config.around_job(self) Quebert.config.after_job(self) val end |
#to_json ⇒ Object
Serialize the job into a JSON string that we can put on the beandstalkd queue.
75 76 77 |
# File 'lib/quebert/job.rb', line 75 def to_json JSON.generate(Serializer::Job.serialize(self)) end |