Class: Delayed::Worker::HealthCheck

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/worker/health_check.rb

Direct Known Subclasses

ConsulHealthCheck, NullHealthCheck

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_name:, config: {}) ⇒ HealthCheck

Returns a new instance of HealthCheck.



79
80
81
82
# File 'lib/delayed/worker/health_check.rb', line 79

def initialize(worker_name:, config: {})
  @config = config.with_indifferent_access
  @worker_name = worker_name
end

Class Attribute Details

.subclassesObject (readonly)

Returns the value of attribute subclasses.



10
11
12
# File 'lib/delayed/worker/health_check.rb', line 10

def subclasses
  @subclasses
end

.type_nameObject

Returns the value of attribute type_name.



9
10
11
# File 'lib/delayed/worker/health_check.rb', line 9

def type_name
  @type_name
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



77
78
79
# File 'lib/delayed/worker/health_check.rb', line 77

def config
  @config
end

#worker_nameObject

Returns the value of attribute worker_name.



77
78
79
# File 'lib/delayed/worker/health_check.rb', line 77

def worker_name
  @worker_name
end

Class Method Details

.attempt_advisory_lockObject



69
70
71
72
73
74
# File 'lib/delayed/worker/health_check.rb', line 69

def attempt_advisory_lock
  lock_name = "Delayed::Worker::HealthCheck#reschedule_abandoned_jobs"
  conn = ActiveRecord::Base.connection
  fn_name = conn.quote_table_name("half_md5_as_bigint")
  conn.select_value("SELECT pg_try_advisory_xact_lock(#{fn_name}('#{lock_name}'));")
end

.build(type:, worker_name:, config: {}) ⇒ Object

Raises:

  • (ArgumentError)


17
18
19
20
21
22
23
# File 'lib/delayed/worker/health_check.rb', line 17

def build(type:, worker_name:, config: {})
  type = type.to_sym
  klass = @subclasses.find { |sc| sc.type_name == type }
  raise ArgumentError, "Unable to build a HealthCheck for type #{type}" unless klass

  klass.new(worker_name: worker_name, config: config)
end

.inherited(subclass) ⇒ Object



12
13
14
15
# File 'lib/delayed/worker/health_check.rb', line 12

def inherited(subclass)
  @subclasses << subclass
  super
end

.reschedule_abandoned_jobsObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/delayed/worker/health_check.rb', line 25

def reschedule_abandoned_jobs
  return if Settings.worker_health_check_type == :none

  Delayed::Job.transaction do
    # this action is a special case, and SHOULD NOT be a periodic job
    # because if it gets wiped out suddenly during execution
    # it can't go clean up it's abandoned self.  Therefore,
    # we expect it to get run from it's own process forked from the job pool
    # and we try to get an advisory lock when it runs.  If we succeed,
    # no other worker is trying to do this right now (and if we abandon the
    # operation, the transaction will end, releasing the advisory lock).
    result = attempt_advisory_lock
    return unless result

    checker = Worker::HealthCheck.build(
      type: Settings.worker_health_check_type,
      config: Settings.worker_health_check_config,
      worker_name: "cleanup-crew"
    )
    live_workers = checker.live_workers

    Delayed::Job.running_jobs.each do |job|
      # prefetched jobs have their own way of automatically unlocking themselves
      next if job.locked_by.start_with?("prefetch:")

      next if live_workers.include?(job.locked_by)

      begin
        Delayed::Job.transaction do
          # double check that the job is still there. locked_by will immediately be reset
          # to nil in this transaction by Job#reschedule
          next unless Delayed::Job.where(id: job,
                                         locked_by: job.locked_by)
                                  .update_all(locked_by: "abandoned job cleanup") == 1

          job.reschedule
        end
      rescue
        ::Rails.logger.error "Failure rescheduling abandoned job #{job.id} #{$!.inspect}"
      end
    end
  end
end

Instance Method Details

#live_workersObject

Raises:

  • (NotImplementedError)


92
93
94
# File 'lib/delayed/worker/health_check.rb', line 92

def live_workers
  raise NotImplementedError
end

#startObject

Raises:

  • (NotImplementedError)


84
85
86
# File 'lib/delayed/worker/health_check.rb', line 84

def start
  raise NotImplementedError
end

#stopObject

Raises:

  • (NotImplementedError)


88
89
90
# File 'lib/delayed/worker/health_check.rb', line 88

def stop
  raise NotImplementedError
end