Class: Quebert::Job
Direct Known Subclasses
AsyncSender::ActiveRecord::RecordJob, AsyncSender::Instance::InstanceJob, AsyncSender::Object::ObjectJob
Constant Summary collapse
- DEFAULT_JOB_PRIORITY =
65536
- DEFAULT_JOB_DELAY =
0
- DEFAULT_JOB_TTR =
10
- QUEBERT_TTR_BUFFER =
A buffer time in seconds added to the Beanstalk TTR for Quebert to do its own job cleanup The job will perform based on the Beanstalk TTR, but Beanstalk hangs on to the job just a little longer so that Quebert can bury the job or schedule a retry with the appropriate delay
1
- NotImplemented =
Class.new(StandardError)
- Action =
Class.new(Exception)
- Bury =
Class.new(Action)
- Delete =
Class.new(Action)
- Release =
Class.new(Action)
- Timeout =
Class.new(Action)
Instance Attribute Summary collapse
-
#args ⇒ Object
readonly
Returns the value of attribute args.
-
#delay ⇒ Object
Returns the value of attribute delay.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#ttr ⇒ Object
Returns the value of attribute ttr.
Class Method Summary collapse
Instance Method Summary collapse
- #enqueue ⇒ Object
-
#initialize(*args) ⇒ Job
constructor
A new instance of Job.
- #perform(*args) ⇒ Object
-
#perform! ⇒ Object
Runs the perform method that somebody else should be implementing.
- #to_json ⇒ Object
Constructor Details
#initialize(*args) ⇒ Job
Returns a new instance of Job.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/quebert/job.rb', line 28 def initialize(*args) opts = args.last.is_a?(::Hash) ? args.pop : nil @priority = DEFAULT_JOB_PRIORITY @delay = DEFAULT_JOB_DELAY @ttr = DEFAULT_JOB_TTR if opts beanstalk_opts = opts.delete(:beanstalk) args << opts unless opts.empty? if beanstalk_opts @priority = beanstalk_opts[:priority] if beanstalk_opts[:priority] @delay = beanstalk_opts[:delay] if beanstalk_opts[:delay] @ttr = beanstalk_opts[:ttr] if beanstalk_opts[:ttr] end end @args = args.dup.freeze end |
Instance Attribute Details
#args ⇒ Object (readonly)
Returns the value of attribute args.
7 8 9 |
# File 'lib/quebert/job.rb', line 7 def args @args end |
#delay ⇒ Object
Returns the value of attribute delay.
8 9 10 |
# File 'lib/quebert/job.rb', line 8 def delay @delay end |
#priority ⇒ Object
Returns the value of attribute priority.
8 9 10 |
# File 'lib/quebert/job.rb', line 8 def priority @priority end |
#ttr ⇒ Object
Returns the value of attribute ttr.
8 9 10 |
# File 'lib/quebert/job.rb', line 8 def ttr @ttr end |
Class Method Details
.backend ⇒ Object
83 84 85 |
# File 'lib/quebert/job.rb', line 83 def self.backend @backend || Quebert.configuration.backend end |
.backend=(backend) ⇒ Object
79 80 81 |
# File 'lib/quebert/job.rb', line 79 def self.backend=(backend) @backend = backend end |
.from_json(json) ⇒ Object
73 74 75 76 77 |
# File 'lib/quebert/job.rb', line 73 def self.from_json(json) if hash = JSON.parse(json) and not hash.empty? Serializer::Job.deserialize(hash) end end |
Instance Method Details
#enqueue ⇒ Object
65 66 67 |
# File 'lib/quebert/job.rb', line 65 def enqueue self.class.backend.put self, @priority, @delay, @ttr + QUEBERT_TTR_BUFFER end |
#perform(*args) ⇒ Object
49 50 51 |
# File 'lib/quebert/job.rb', line 49 def perform(*args) raise NotImplemented end |
#perform! ⇒ Object
Runs the perform method that somebody else should be implementing
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/quebert/job.rb', line 54 def perform! # Honor the timeout and kill the job in ruby-space. Beanstalk # should be cleaning up this job and returning it to the queue # as well. begin Quebert::Timeout.timeout(@ttr){ perform(*args) } rescue ::Timeout::Error raise Job::Timeout end end |
#to_json ⇒ Object
69 70 71 |
# File 'lib/quebert/job.rb', line 69 def to_json JSON.generate(Serializer::Job.serialize(self)) end |