Class: Quebert::Job

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/quebert/job.rb

Defined Under Namespace

Classes: Priority

Constant Summary collapse

DEFAULT_JOB_DELAY =

Delay a job for 0 seconds on the jobqueue

0
DEFAULT_JOB_TTR =

By default, the job should live for 10 seconds tops.

10
QUEBERT_TTR_BUFFER =

A buffer time in seconds added to the Beanstalk TTR for Quebert to do its own job cleanup The job will perform based on the Beanstalk TTR, but Beanstalk hangs on to the job just a little longer so that Quebert can bury the job or schedule a retry with the appropriate delay

5
NotImplemented =

Exceptions are used for signaling job status… ewww. Yank this out and replace with a more well thought out controller.

Class.new(StandardError)
Action =
Class.new(Exception)
Bury =
Class.new(Action)
Delete =
Class.new(Action)
Release =
Class.new(Action)
Timeout =
Class.new(Action)
Retry =
Class.new(Action)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) {|_self| ... } ⇒ Job

Returns a new instance of Job.

Yields:

  • (_self)

Yield Parameters:

  • _self (Quebert::Job)

    the object that the method was called on



39
40
41
42
43
44
45
46
# File 'lib/quebert/job.rb', line 39

def initialize(*args)
  @priority = Job::Priority::MEDIUM
  @delay    = DEFAULT_JOB_DELAY
  @ttr      = DEFAULT_JOB_TTR
  @args     = args.dup.freeze
  yield self if block_given?
  self
end

Instance Attribute Details

#argsObject (readonly)

Returns the value of attribute args.



8
9
10
# File 'lib/quebert/job.rb', line 8

def args
  @args
end

#delayObject

Returns the value of attribute delay.



9
10
11
# File 'lib/quebert/job.rb', line 9

def delay
  @delay
end

#priorityObject

Returns the value of attribute priority.



9
10
11
# File 'lib/quebert/job.rb', line 9

def priority
  @priority
end

#ttrObject

Returns the value of attribute ttr.



9
10
11
# File 'lib/quebert/job.rb', line 9

def ttr
  @ttr
end

Class Method Details

.backendObject



90
91
92
# File 'lib/quebert/job.rb', line 90

def self.backend
  @backend || Quebert.configuration.backend
end

.backend=(backend) ⇒ Object



86
87
88
# File 'lib/quebert/job.rb', line 86

def self.backend=(backend)
  @backend = backend
end

.from_json(json) ⇒ Object

Read a JSON string and convert into a hash that Ruby can deal with.



80
81
82
83
84
# File 'lib/quebert/job.rb', line 80

def self.from_json(json)
  if hash = JSON.parse(json) and not hash.empty?
    Serializer::Job.deserialize(hash)
  end
end

Instance Method Details

#enqueue(opts = {}) ⇒ Object

Accepts arguments that override the job options and enqueu this stuff.



69
70
71
72
# File 'lib/quebert/job.rb', line 69

def enqueue(opts={})
  opts.each { |opt, val| self.send("#{opt}=", val) }
  self.class.backend.put self, @priority, @delay, @ttr + QUEBERT_TTR_BUFFER
end

#perform(*args) ⇒ Object

Raises:



48
49
50
# File 'lib/quebert/job.rb', line 48

def perform(*args)
  raise NotImplemented
end

#perform!Object

Runs the perform method that somebody else should be implementing



53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/quebert/job.rb', line 53

def perform!
  Quebert.config.before_job(self)
  Quebert.config.around_job(self)

  # Honor the timeout and kill the job in ruby-space. Beanstalk
  # should be cleaning up this job and returning it to the queue
  # as well.
  val = ::Timeout.timeout(@ttr, Job::Timeout){ perform(*args) }

  Quebert.config.around_job(self)
  Quebert.config.after_job(self)

  val
end

#to_jsonObject

Serialize the job into a JSON string that we can put on the beandstalkd queue.



75
76
77
# File 'lib/quebert/job.rb', line 75

def to_json
  JSON.generate(Serializer::Job.serialize(self))
end