Class: Delayed::Periodic

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/periodic.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, cron_line, job_args, block) ⇒ Periodic

Returns a new instance of Periodic.



51
52
53
54
55
56
# File 'lib/delayed/periodic.rb', line 51

def initialize(name, cron_line, job_args, block)
  @name = name
  @cron = Fugit.do_parse_cron(cron_line)
  @job_args = { priority: Delayed::LOW_PRIORITY }.merge(job_args.symbolize_keys)
  @block = block
end

Instance Attribute Details

#cronObject (readonly)

Returns the value of attribute cron.



7
8
9
# File 'lib/delayed/periodic.rb', line 7

def cron
  @cron
end

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/delayed/periodic.rb', line 7

def name
  @name
end

Class Method Details

.add_overrides(overrides) ⇒ Object



17
18
19
20
21
22
23
# File 'lib/delayed/periodic.rb', line 17

def self.add_overrides(overrides)
  overrides.each do |_name, cron_line|
    # throws error if the line is malformed
    Fugit.do_parse_cron(cron_line)
  end
  self.overrides.merge!(overrides)
end

.audit_queueObject



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/delayed/periodic.rb', line 32

def self.audit_queue
  # we used to queue up a job in a strand here, and perform the audit inside that job
  # however, now that we're using singletons for scheduling periodic jobs,
  # it's fine to just do the audit in-line here without risk of creating duplicates
  Delayed::Job.transaction do
    # for db performance reasons, we only need one process doing this at a time
    # so if we can't get an advisory lock, just abort. we'll try again soon
    next unless Delayed::Job.attempt_advisory_lock("Delayed::Periodic#audit_queue")

    perform_audit!
  end
end

.cron(job_name, cron_line, job_args = {}, &block) ⇒ Object

Raises:

  • (ArgumentError)


25
26
27
28
29
30
# File 'lib/delayed/periodic.rb', line 25

def self.cron(job_name, cron_line, job_args = {}, &block)
  raise ArgumentError, "job #{job_name} already scheduled!" if scheduled[job_name]

  cron_line = overrides[job_name] || cron_line
  scheduled[job_name] = new(job_name, cron_line, job_args, block)
end

.nowObject



88
89
90
# File 'lib/delayed/periodic.rb', line 88

def self.now
  Time.zone.now
end

.perform_audit!Object

make sure all periodic jobs are scheduled for their next run in the job queue this auditing should run on the strand



47
48
49
# File 'lib/delayed/periodic.rb', line 47

def self.perform_audit!
  scheduled.each { |_name, periodic| periodic.enqueue }
end

Instance Method Details

#encode_with(coder) ⇒ Object



9
10
11
# File 'lib/delayed/periodic.rb', line 9

def encode_with(coder)
  coder.scalar("!ruby/Delayed::Periodic", @name)
end

#enqueueObject



58
59
60
# File 'lib/delayed/periodic.rb', line 58

def enqueue
  Delayed::Job.enqueue(self, **enqueue_args)
end

#enqueue_argsObject



62
63
64
65
66
67
68
69
70
# File 'lib/delayed/periodic.rb', line 62

def enqueue_args
  inferred_args = {
    max_attempts: 1,
    run_at: @cron.next_time(Delayed::Periodic.now).utc.to_time,
    singleton: tag,
    on_conflict: :patient
  }
  @job_args.merge(inferred_args)
end

#performObject



72
73
74
75
76
77
78
79
80
81
# File 'lib/delayed/periodic.rb', line 72

def perform
  @block.call
ensure
  begin
    enqueue
  rescue
    # double fail! the auditor will have to catch this.
    Rails.logger.error "Failure enqueueing periodic job! #{@name} #{$!.inspect}"
  end
end

#tagObject Also known as: display_name



83
84
85
# File 'lib/delayed/periodic.rb', line 83

def tag
  "periodic: #{@name}"
end