Class: Delayed::Periodic

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/periodic.rb

Constant Summary collapse

STRAND =
'periodic scheduling'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, cron_line, job_args, block) ⇒ Periodic

Returns a new instance of Periodic.



50
51
52
53
54
55
# File 'lib/delayed/periodic.rb', line 50

def initialize(name, cron_line, job_args, block)
  @name = name
  @cron = Rufus::Scheduler::CronLine.new(cron_line)
  @job_args = { :priority => Delayed::LOW_PRIORITY }.merge(job_args.symbolize_keys)
  @block = block
end

Instance Attribute Details

#cronObject (readonly)

Returns the value of attribute cron.



5
6
7
# File 'lib/delayed/periodic.rb', line 5

def cron
  @cron
end

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/delayed/periodic.rb', line 5

def name
  @name
end

Class Method Details

.add_overrides(overrides) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/delayed/periodic.rb', line 21

def self.add_overrides(overrides)
  overrides.each do |name, cron_line|
    # throws error if the line is malformed
    Rufus::Scheduler::CronLine.new(cron_line)
  end
  self.overrides.merge!(overrides)
end

.audit_queueObject



37
38
39
40
41
42
# File 'lib/delayed/periodic.rb', line 37

def self.audit_queue
  # we used to queue up a job in a strand here, and perform the audit inside that job
  # however, now that we're using singletons for scheduling periodic jobs,
  # it's fine to just do the audit in-line here without risk of creating duplicates
  perform_audit!
end

.cron(job_name, cron_line, job_args = {}, &block) ⇒ Object

Raises:

  • (ArgumentError)


31
32
33
34
35
# File 'lib/delayed/periodic.rb', line 31

def self.cron(job_name, cron_line, job_args = {}, &block)
  raise ArgumentError, "job #{job_name} already scheduled!" if self.scheduled[job_name]
  cron_line = overrides[job_name] || cron_line
  self.scheduled[job_name] = self.new(job_name, cron_line, job_args, block)
end

.nowObject



77
78
79
# File 'lib/delayed/periodic.rb', line 77

def self.now
  Time.zone.now
end

.perform_audit!Object

make sure all periodic jobs are scheduled for their next run in the job queue this auditing should run on the strand



46
47
48
# File 'lib/delayed/periodic.rb', line 46

def self.perform_audit!
  self.scheduled.each { |name, periodic| periodic.enqueue }
end

.yaml_new(klass, tag, val) ⇒ Object



13
14
15
# File 'lib/delayed/periodic.rb', line 13

def self.yaml_new(klass, tag, val)
  self.scheduled[val] || raise(NameError, "job #{val} is no longer scheduled")
end

Instance Method Details

#enqueueObject



57
58
59
# File 'lib/delayed/periodic.rb', line 57

def enqueue
  Delayed::Job.enqueue(self, @job_args.merge(:max_attempts => 1, :run_at => @cron.next_time(Delayed::Periodic.now), :singleton => tag))
end

#performObject



61
62
63
64
65
66
67
68
69
70
# File 'lib/delayed/periodic.rb', line 61

def perform
  @block.call()
ensure
  begin
    enqueue
  rescue
    # double fail! the auditor will have to catch this.
    Rails.logger.error "Failure enqueueing periodic job! #{@name} #{$!.inspect}"
  end
end

#tagObject Also known as: display_name



72
73
74
# File 'lib/delayed/periodic.rb', line 72

def tag
  "periodic: #{@name}"
end

#to_yaml(opts = {}) ⇒ Object



9
10
11
# File 'lib/delayed/periodic.rb', line 9

def to_yaml(opts = {})
  YAML.quick_emit(self.object_id, opts) { |out| out.scalar(taguri, @name) }
end