Class: Delayed::Periodic

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/periodic.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, cron_line, job_args, block) ⇒ Periodic

Returns a new instance of Periodic.


44
45
46
47
48
49
# File 'lib/delayed/periodic.rb', line 44

def initialize(name, cron_line, job_args, block)
  @name = name
  @cron = Fugit.do_parse_cron(cron_line)
  @job_args = { :priority => Delayed::LOW_PRIORITY }.merge(job_args.symbolize_keys)
  @block = block
end

Instance Attribute Details

#cronObject (readonly)

Returns the value of attribute cron


7
8
9
# File 'lib/delayed/periodic.rb', line 7

def cron
  @cron
end

#nameObject (readonly)

Returns the value of attribute name


7
8
9
# File 'lib/delayed/periodic.rb', line 7

def name
  @name
end

Class Method Details

.add_overrides(overrides) ⇒ Object


17
18
19
20
21
22
23
# File 'lib/delayed/periodic.rb', line 17

def self.add_overrides(overrides)
  overrides.each do |name, cron_line|
    # throws error if the line is malformed
    Fugit.do_parse_cron(cron_line)
  end
  self.overrides.merge!(overrides)
end

.audit_queueObject


31
32
33
34
35
36
# File 'lib/delayed/periodic.rb', line 31

def self.audit_queue
  # we used to queue up a job in a strand here, and perform the audit inside that job
  # however, now that we're using singletons for scheduling periodic jobs,
  # it's fine to just do the audit in-line here without risk of creating duplicates
  perform_audit!
end

.cron(job_name, cron_line, job_args = {}, &block) ⇒ Object

Raises:

  • (ArgumentError)

25
26
27
28
29
# File 'lib/delayed/periodic.rb', line 25

def self.cron(job_name, cron_line, job_args = {}, &block)
  raise ArgumentError, "job #{job_name} already scheduled!" if self.scheduled[job_name]
  cron_line = overrides[job_name] || cron_line
  self.scheduled[job_name] = self.new(job_name, cron_line, job_args, block)
end

.nowObject


81
82
83
# File 'lib/delayed/periodic.rb', line 81

def self.now
  Time.zone.now
end

.perform_audit!Object

make sure all periodic jobs are scheduled for their next run in the job queue this auditing should run on the strand


40
41
42
# File 'lib/delayed/periodic.rb', line 40

def self.perform_audit!
  self.scheduled.each { |name, periodic| periodic.enqueue }
end

Instance Method Details

#encode_with(coder) ⇒ Object


9
10
11
# File 'lib/delayed/periodic.rb', line 9

def encode_with(coder)
  coder.scalar("!ruby/Delayed::Periodic", @name)
end

#enqueueObject


51
52
53
# File 'lib/delayed/periodic.rb', line 51

def enqueue
  Delayed::Job.enqueue(self, **enqueue_args)
end

#enqueue_argsObject


55
56
57
58
59
60
61
62
63
# File 'lib/delayed/periodic.rb', line 55

def enqueue_args
  inferred_args = {
    max_attempts: 1,
    run_at: @cron.next_time(Delayed::Periodic.now).utc.to_time,
    singleton: tag,
    on_conflict: :patient
  }
  @job_args.merge(inferred_args)
end

#performObject


65
66
67
68
69
70
71
72
73
74
# File 'lib/delayed/periodic.rb', line 65

def perform
  @block.call()
ensure
  begin
    enqueue
  rescue
    # double fail! the auditor will have to catch this.
    Rails.logger.error "Failure enqueueing periodic job! #{@name} #{$!.inspect}"
  end
end

#tagObject Also known as: display_name


76
77
78
# File 'lib/delayed/periodic.rb', line 76

def tag
  "periodic: #{@name}"
end