Class: Delayed::Worker::HealthCheck

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/worker/health_check.rb

Direct Known Subclasses

ConsulHealthCheck, NullHealthCheck

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_name:, config: {}) ⇒ HealthCheck

Returns a new instance of HealthCheck.



77
78
79
80
# File 'lib/delayed/worker/health_check.rb', line 77

def initialize(worker_name:, config: {})
  @config = config.with_indifferent_access
  @worker_name = worker_name
end

Class Attribute Details

.subclassesObject (readonly)

Returns the value of attribute subclasses.



10
11
12
# File 'lib/delayed/worker/health_check.rb', line 10

def subclasses
  @subclasses
end

.type_nameObject

Returns the value of attribute type_name.



9
10
11
# File 'lib/delayed/worker/health_check.rb', line 9

def type_name
  @type_name
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



75
76
77
# File 'lib/delayed/worker/health_check.rb', line 75

def config
  @config
end

#worker_nameObject

Returns the value of attribute worker_name.



75
76
77
# File 'lib/delayed/worker/health_check.rb', line 75

def worker_name
  @worker_name
end

Class Method Details

.build(type:, worker_name:, config: {}) ⇒ Object

Raises:

  • (ArgumentError)


17
18
19
20
21
22
23
# File 'lib/delayed/worker/health_check.rb', line 17

def build(type:, worker_name:, config: {})
  type = type.to_sym
  klass = @subclasses.find { |sc| sc.type_name == type }
  raise ArgumentError, "Unable to build a HealthCheck for type #{type}" unless klass

  klass.new(worker_name: worker_name, config: config)
end

.inherited(subclass) ⇒ Object



12
13
14
15
# File 'lib/delayed/worker/health_check.rb', line 12

def inherited(subclass)
  @subclasses << subclass
  super
end

.reschedule_abandoned_jobsObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/delayed/worker/health_check.rb', line 25

def reschedule_abandoned_jobs
  return if Settings.worker_health_check_type == :none

  Delayed::Job.transaction do
    # this action is a special case, and SHOULD NOT be a periodic job
    # because if it gets wiped out suddenly during execution
    # it can't go clean up its abandoned self.  Therefore,
    # we expect it to get run from it's own process forked from the job pool
    # and we try to get an advisory lock when it runs.  If we succeed,
    # no other worker is trying to do this right now (and if we abandon the
    # operation, the transaction will end, releasing the advisory lock).
    result = Delayed::Job.attempt_advisory_lock("Delayed::Worker::HealthCheck#reschedule_abandoned_jobs")
    next unless result

    horizon = 5.minutes.ago

    checker = Worker::HealthCheck.build(
      type: Settings.worker_health_check_type,
      config: Settings.worker_health_check_config,
      worker_name: "cleanup-crew"
    )
    live_workers = checker.live_workers

    loop do
      batch = Delayed::Job.running_jobs
                          .where("locked_at<?", horizon)
                          .where.not("locked_by LIKE 'prefetch:%'")
                          .where.not(locked_by: live_workers)
                          .limit(100)
                          .to_a
      break if batch.empty?

      batch.each do |job|
        Delayed::Job.transaction do
          # double check that the job is still there. locked_by will immediately be reset
          # to nil in this transaction by Job#reschedule
          next unless Delayed::Job.where(id: job,
                                         locked_by: job.locked_by)
                                  .update_all(locked_by: "abandoned job cleanup") == 1

          job.reschedule
        end
      end
    rescue
      ::Rails.logger.error "Failure rescheduling abandoned job #{job.id} #{$!.inspect}"
    end
  end
end

Instance Method Details

#live_workersObject

Raises:

  • (NotImplementedError)


90
91
92
# File 'lib/delayed/worker/health_check.rb', line 90

def live_workers
  raise NotImplementedError
end

#startObject

Raises:

  • (NotImplementedError)


82
83
84
# File 'lib/delayed/worker/health_check.rb', line 82

def start
  raise NotImplementedError
end

#stopObject

Raises:

  • (NotImplementedError)


86
87
88
# File 'lib/delayed/worker/health_check.rb', line 86

def stop
  raise NotImplementedError
end