Class: Delayed::Worker::HealthCheck

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/worker/health_check.rb

Direct Known Subclasses

ConsulHealthCheck, NullHealthCheck

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_name:, config: {}) ⇒ HealthCheck

Returns a new instance of HealthCheck.


70
71
72
73
# File 'lib/delayed/worker/health_check.rb', line 70

def initialize(worker_name:, config: {})
  @config = config.with_indifferent_access
  @worker_name = worker_name
end

Class Attribute Details

.subclassesObject (readonly)

Returns the value of attribute subclasses


10
11
12
# File 'lib/delayed/worker/health_check.rb', line 10

def subclasses
  @subclasses
end

.type_nameObject

Returns the value of attribute type_name


9
10
11
# File 'lib/delayed/worker/health_check.rb', line 9

def type_name
  @type_name
end

Instance Attribute Details

#configObject

Returns the value of attribute config


68
69
70
# File 'lib/delayed/worker/health_check.rb', line 68

def config
  @config
end

#worker_nameObject

Returns the value of attribute worker_name


68
69
70
# File 'lib/delayed/worker/health_check.rb', line 68

def worker_name
  @worker_name
end

Class Method Details

.attempt_advisory_lockObject


61
62
63
64
65
# File 'lib/delayed/worker/health_check.rb', line 61

def attempt_advisory_lock
  lock_name = "Delayed::Worker::HealthCheck#reschedule_abandoned_jobs"
  conn = ActiveRecord::Base.connection
  conn.select_value("SELECT pg_try_advisory_xact_lock(#{conn.quote_table_name('half_md5_as_bigint')}('#{lock_name}'));")
end

.build(type:, worker_name:, config: {}) ⇒ Object

Raises:

  • (ArgumentError)

16
17
18
19
20
21
# File 'lib/delayed/worker/health_check.rb', line 16

def build(type:, worker_name:, config: {})
  type = type.to_sym
  klass = @subclasses.find { |sc| sc.type_name == type }
  raise ArgumentError, "Unable to build a HealthCheck for type #{type}" unless klass
  klass.new(worker_name: worker_name, config: config)
end

.inherited(subclass) ⇒ Object


12
13
14
# File 'lib/delayed/worker/health_check.rb', line 12

def inherited(subclass)
  @subclasses << subclass
end

.reschedule_abandoned_jobsObject


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/delayed/worker/health_check.rb', line 23

def reschedule_abandoned_jobs
  return if Settings.worker_health_check_type == :none
  Delayed::Job.transaction do
    # this action is a special case, and SHOULD NOT be a periodic job
    # because if it gets wiped out suddenly during execution
    # it can't go clean up it's abandoned self.  Therefore,
    # we expect it to get run from it's own process forked from the job pool
    # and we try to get an advisory lock when it runs.  If we succeed,
    # no other worker is trying to do this right now (and if we abandon the
    # operation, the transaction will end, releasing the advisory lock).
    result = attempt_advisory_lock
    return unless result
    checker = Worker::HealthCheck.build(
      type: Settings.worker_health_check_type,
      config: Settings.worker_health_check_config,
      worker_name: 'cleanup-crew'
    )
    live_workers = checker.live_workers

    Delayed::Job.running_jobs.each do |job|
      # prefetched jobs have their own way of automatically unlocking themselves
      next if job.locked_by.start_with?("prefetch:")
      unless live_workers.include?(job.locked_by)
        begin
          Delayed::Job.transaction do
            # double check that the job is still there. locked_by will immediately be reset
            # to nil in this transaction by Job#reschedule
            next unless Delayed::Job.where(id: job, locked_by: job.locked_by).update_all(locked_by: "abandoned job cleanup") == 1
            job.reschedule
          end
        rescue
          ::Rails.logger.error "Failure rescheduling abandoned job #{job.id} #{$!.inspect}"
        end
      end
    end
  end
end

Instance Method Details

#live_workersObject

Raises:

  • (NotImplementedError)

83
84
85
# File 'lib/delayed/worker/health_check.rb', line 83

def live_workers
  raise NotImplementedError
end

#startObject

Raises:

  • (NotImplementedError)

75
76
77
# File 'lib/delayed/worker/health_check.rb', line 75

def start
  raise NotImplementedError
end

#stopObject

Raises:

  • (NotImplementedError)

79
80
81
# File 'lib/delayed/worker/health_check.rb', line 79

def stop
  raise NotImplementedError
end