Class: Quebert::Job

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/quebert/job.rb

Constant Summary collapse

DEFAULT_JOB_PRIORITY =
65536
DEFAULT_JOB_DELAY =
0
DEFAULT_JOB_TTR =
10
QUEBERT_TTR_BUFFER =

A buffer time in seconds added to the Beanstalk TTR for Quebert to do its own job cleanup The job will perform based on the Beanstalk TTR, but Beanstalk hangs on to the job just a little longer so that Quebert can bury the job or schedule a retry with the appropriate delay

1
NotImplemented =
Class.new(StandardError)
Action =
Class.new(Exception)
Bury =
Class.new(Action)
Delete =
Class.new(Action)
Release =
Class.new(Action)
Timeout =
Class.new(Action)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Job

Returns a new instance of Job.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/quebert/job.rb', line 28

def initialize(*args)
  opts = args.last.is_a?(::Hash) ? args.pop : nil
  
  @priority = DEFAULT_JOB_PRIORITY
  @delay = DEFAULT_JOB_DELAY
  @ttr = DEFAULT_JOB_TTR

  if opts
    beanstalk_opts = opts.delete(:beanstalk)
    args << opts unless opts.empty?
    
    if beanstalk_opts
      @priority = beanstalk_opts[:priority] if beanstalk_opts[:priority]
      @delay = beanstalk_opts[:delay] if beanstalk_opts[:delay]
      @ttr = beanstalk_opts[:ttr] if beanstalk_opts[:ttr]
    end
  end

  @args = args.dup.freeze
end

Instance Attribute Details

#argsObject (readonly)

Returns the value of attribute args.



7
8
9
# File 'lib/quebert/job.rb', line 7

def args
  @args
end

#delayObject

Returns the value of attribute delay.



8
9
10
# File 'lib/quebert/job.rb', line 8

def delay
  @delay
end

#priorityObject

Returns the value of attribute priority.



8
9
10
# File 'lib/quebert/job.rb', line 8

def priority
  @priority
end

#ttrObject

Returns the value of attribute ttr.



8
9
10
# File 'lib/quebert/job.rb', line 8

def ttr
  @ttr
end

Class Method Details

.backendObject



83
84
85
# File 'lib/quebert/job.rb', line 83

def self.backend
  @backend || Quebert.configuration.backend
end

.backend=(backend) ⇒ Object



79
80
81
# File 'lib/quebert/job.rb', line 79

def self.backend=(backend)
  @backend = backend
end

.from_json(json) ⇒ Object



73
74
75
76
77
# File 'lib/quebert/job.rb', line 73

def self.from_json(json)
  if hash = JSON.parse(json) and not hash.empty?
    Serializer::Job.deserialize(hash)
  end
end

Instance Method Details

#enqueueObject



65
66
67
# File 'lib/quebert/job.rb', line 65

def enqueue
  self.class.backend.put self, @priority, @delay, @ttr + QUEBERT_TTR_BUFFER
end

#perform(*args) ⇒ Object

Raises:



49
50
51
# File 'lib/quebert/job.rb', line 49

def perform(*args)
  raise NotImplemented
end

#perform!Object

Runs the perform method that somebody else should be implementing



54
55
56
57
58
59
60
61
62
63
# File 'lib/quebert/job.rb', line 54

def perform!
  # Honor the timeout and kill the job in ruby-space. Beanstalk
  # should be cleaning up this job and returning it to the queue
  # as well.
  begin
    Quebert::Timeout.timeout(@ttr){ perform(*args) }
  rescue ::Timeout::Error
    raise Job::Timeout
  end
end

#to_jsonObject



69
70
71
# File 'lib/quebert/job.rb', line 69

def to_json
  JSON.generate(Serializer::Job.serialize(self))
end