Class: Sidekiq::Superworker::SubjobProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/superworker/subjob_processor.rb

Class Method Summary collapse

Class Method Details

.complete(subjob) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/sidekiq/superworker/subjob_processor.rb', line 72

def complete(subjob)
  Superworker.debug "#{subjob.to_info}: Complete"
  subjob.update_attribute(:status, 'complete')

  # If children are present, enqueue the first one
  children = subjob.children
  if children.present?
    Superworker.debug "#{subjob.to_info}: Enqueueing children"
    enqueue(children.first)
    return
  # Otherwise, set this as having its descendants complete
  else
    descendants_are_complete(subjob)
  end
end

.enqueue(subjob) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/sidekiq/superworker/subjob_processor.rb', line 5

def enqueue(subjob)
  Superworker.debug "#{subjob.to_info}: Trying to enqueue"
  # Only enqueue subjobs that aren't running, complete, etc
  return unless subjob.status == 'initialized'
  
  Superworker.debug "#{subjob.to_info}: Enqueueing"
  # If this is a parallel subjob, enqueue all of its children
  if subjob.subworker_class == 'parallel'
    subjob.update_attribute(:status, 'running')

    Superworker.debug "#{subjob.to_info}: Enqueueing parallel children"
    jids = subjob.children.collect do |child|
      enqueue(child)
    end
    jid = jids.first
  elsif subjob.subworker_class == 'batch'
    subjob.update_attribute(:status, 'running')

    Superworker.debug "#{subjob.to_info}: Enqueueing batch children"
    jids = subjob.children.collect do |child|
      child.update_attribute(:status, 'running')
      enqueue(child.children.first)
    end
    jid = jids.first
  else
    klass = "::#{subjob.subworker_class}".constantize

    # If this is a superworker, mark it as complete, which will queue its children or its next subjob
    if klass.respond_to?(:is_a_superworker?) && klass.is_a_superworker?
      complete(subjob)
    # Otherwise, enqueue it in Sidekiq
    else
      # We need to explicitly set the job's JID, so that the ActiveRecord record can be updated before
      # the job fires off. If the job started first, it could finish before the ActiveRecord update
      # transaction completes, causing a race condition when finding the ActiveRecord record in
      # Processor#complete.
      jid = subjob.jid
      subjob.update_attributes(
        status: 'queued'
      )
      enqueue_in_sidekiq(subjob, klass, jid)
    end
  end
  jid
end

.enqueue_in_sidekiq(subjob, klass, jid) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/sidekiq/superworker/subjob_processor.rb', line 51

def enqueue_in_sidekiq(subjob, klass, jid)
  Superworker.debug "#{subjob.to_info}: Enqueueing in Sidekiq"

  # If sidekiq-unique-jobs is being used for this worker, a number of issues arise if the subjob isn't
  # queued, so we'll bypass the unique functionality of the worker while running the subjob.
  is_unique = klass.respond_to?(:sidekiq_options_hash) && !!(klass.sidekiq_options_hash || {})['unique']
  if is_unique
    unique_value = klass.sidekiq_options_hash.delete('unique')
    unique_job_expiration_value = klass.sidekiq_options_hash.delete('unique_job_expiration')
  end

  sidekiq_push(subjob, klass, jid)

  if is_unique
    klass.sidekiq_options_hash['unique'] = unique_value
    klass.sidekiq_options_hash['unique_job_expiration'] = unique_job_expiration_value
  end

  jid
end

.error(subjob, worker, item, exception) ⇒ Object



88
89
90
91
92
# File 'lib/sidekiq/superworker/subjob_processor.rb', line 88

def error(subjob, worker, item, exception)
  Superworker.debug "#{subjob.to_info}: Error"
  subjob.update_attribute(:status, 'failed')
  SuperjobProcessor.error(subjob.superjob_id, worker, item, exception)
end