Class: Delayed::Worker::HealthCheck

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/worker/health_check.rb

Direct Known Subclasses

ConsulHealthCheck, NullHealthCheck

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_name:, config: {}) ⇒ HealthCheck

Returns a new instance of HealthCheck.



69
70
71
72
# File 'lib/delayed/worker/health_check.rb', line 69

def initialize(worker_name:, config: {})
  @config = config.with_indifferent_access
  @worker_name = worker_name
end

Class Attribute Details

.subclassesObject (readonly)

Returns the value of attribute subclasses.



10
11
12
# File 'lib/delayed/worker/health_check.rb', line 10

def subclasses
  @subclasses
end

.type_nameObject

Returns the value of attribute type_name.



9
10
11
# File 'lib/delayed/worker/health_check.rb', line 9

def type_name
  @type_name
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



67
68
69
# File 'lib/delayed/worker/health_check.rb', line 67

def config
  @config
end

#worker_nameObject

Returns the value of attribute worker_name.



67
68
69
# File 'lib/delayed/worker/health_check.rb', line 67

def worker_name
  @worker_name
end

Class Method Details

.attempt_advisory_lockObject



60
61
62
63
64
# File 'lib/delayed/worker/health_check.rb', line 60

def attempt_advisory_lock
  lock_name = "Delayed::Worker::HealthCheck#reschedule_abandoned_jobs"
  conn = ActiveRecord::Base.connection
  conn.select_value("SELECT pg_try_advisory_xact_lock(#{conn.quote_table_name('half_md5_as_bigint')}('#{lock_name}'));")
end

.build(type:, worker_name:, config: {}) ⇒ Object

Raises:

  • (ArgumentError)


16
17
18
19
20
21
# File 'lib/delayed/worker/health_check.rb', line 16

def build(type:, worker_name:, config: {})
  type = type.to_sym
  klass = @subclasses.find { |sc| sc.type_name == type }
  raise ArgumentError, "Unable to build a HealthCheck for type #{type}" unless klass
  klass.new(worker_name: worker_name, config: config)
end

.inherited(subclass) ⇒ Object



12
13
14
# File 'lib/delayed/worker/health_check.rb', line 12

def inherited(subclass)
  @subclasses << subclass
end

.reschedule_abandoned_jobsObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/delayed/worker/health_check.rb', line 23

def reschedule_abandoned_jobs
  return if Settings.worker_health_check_type == :none
  Delayed::Job.transaction do
    # this job is a special case, and is not a singleton
    # because if it gets wiped out suddenly during execution
    # it can't go clean up it's abandoned self.  Therefore,
    # we try to get an advisory lock when it runs.  If we succeed,
    # no other job is trying to do this right now (and if we abandon the
    # job, the transaction will end, releasing the advisory lock).
    result = attempt_advisory_lock
    return unless result
    checker = Worker::HealthCheck.build(
      type: Settings.worker_health_check_type,
      config: Settings.worker_health_check_config,
      worker_name: 'cleanup-crew'
    )
    live_workers = checker.live_workers

    Delayed::Job.running_jobs.each do |job|
      # prefetched jobs have their own way of automatically unlocking themselves
      next if job.locked_by.start_with?("prefetch:")
      unless live_workers.include?(job.locked_by)
        begin
          Delayed::Job.transaction do
            # double check that the job is still there. locked_by will immediately be reset
            # to nil in this transaction by Job#reschedule
            next unless Delayed::Job.where(id: job, locked_by: job.locked_by).update_all(locked_by: "abandoned job cleanup") == 1
            job.reschedule
          end
        rescue
          ::Rails.logger.error "Failure rescheduling abandoned job #{job.id} #{$!.inspect}"
        end
      end
    end
  end
end

Instance Method Details

#live_workersObject

Raises:

  • (NotImplementedError)


82
83
84
# File 'lib/delayed/worker/health_check.rb', line 82

def live_workers
  raise NotImplementedError
end

#startObject

Raises:

  • (NotImplementedError)


74
75
76
# File 'lib/delayed/worker/health_check.rb', line 74

def start
  raise NotImplementedError
end

#stopObject

Raises:

  • (NotImplementedError)


78
79
80
# File 'lib/delayed/worker/health_check.rb', line 78

def stop
  raise NotImplementedError
end