Class: Sidekiq::Superworker::SubjobProcessor
- Inherits:
-
Object
- Object
- Sidekiq::Superworker::SubjobProcessor
- Defined in:
- lib/sidekiq/superworker/subjob_processor.rb
Class Method Summary collapse
- .complete(subjob) ⇒ Object
- .enqueue(subjob) ⇒ Object
- .enqueue_in_sidekiq(subjob, klass, jid) ⇒ Object
- .error(subjob, worker, item, exception) ⇒ Object
Class Method Details
.complete(subjob) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/sidekiq/superworker/subjob_processor.rb', line 72 def complete(subjob) Superworker.debug "#{subjob.to_info}: Complete" subjob.update_attribute(:status, 'complete') # If children are present, enqueue the first one children = subjob.children if children.present? Superworker.debug "#{subjob.to_info}: Enqueueing children" enqueue(children.first) return # Otherwise, set this as having its descendants complete else descendants_are_complete(subjob) end end |
.enqueue(subjob) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/sidekiq/superworker/subjob_processor.rb', line 5 def enqueue(subjob) Superworker.debug "#{subjob.to_info}: Trying to enqueue" # Only enqueue subjobs that aren't running, complete, etc return unless subjob.status == 'initialized' Superworker.debug "#{subjob.to_info}: Enqueueing" # If this is a parallel subjob, enqueue all of its children if subjob.subworker_class == 'parallel' subjob.update_attribute(:status, 'running') Superworker.debug "#{subjob.to_info}: Enqueueing parallel children" jids = subjob.children.collect do |child| enqueue(child) end jid = jids.first elsif subjob.subworker_class == 'batch' subjob.update_attribute(:status, 'running') Superworker.debug "#{subjob.to_info}: Enqueueing batch children" jids = subjob.children.collect do |child| child.update_attribute(:status, 'running') enqueue(child.children.first) end jid = jids.first else klass = "::#{subjob.subworker_class}".constantize # If this is a superworker, mark it as complete, which will queue its children or its next subjob if klass.respond_to?(:is_a_superworker?) && klass.is_a_superworker? complete(subjob) # Otherwise, enqueue it in Sidekiq else # We need to explicitly set the job's JID, so that the ActiveRecord record can be updated before # the job fires off. If the job started first, it could finish before the ActiveRecord update # transaction completes, causing a race condition when finding the ActiveRecord record in # Processor#complete. jid = subjob.jid subjob.update_attributes( status: 'queued' ) enqueue_in_sidekiq(subjob, klass, jid) end end jid end |
.enqueue_in_sidekiq(subjob, klass, jid) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/sidekiq/superworker/subjob_processor.rb', line 51 def enqueue_in_sidekiq(subjob, klass, jid) Superworker.debug "#{subjob.to_info}: Enqueueing in Sidekiq" # If sidekiq-unique-jobs is being used for this worker, a number of issues arise if the subjob isn't # queued, so we'll bypass the unique functionality of the worker while running the subjob. is_unique = klass.respond_to?(:sidekiq_options_hash) && !!(klass. || {})['unique'] if is_unique unique_value = klass..delete('unique') unique_job_expiration_value = klass..delete('unique_job_expiration') end sidekiq_push(subjob, klass, jid) if is_unique klass.['unique'] = unique_value klass.['unique_job_expiration'] = unique_job_expiration_value end jid end |
.error(subjob, worker, item, exception) ⇒ Object
88 89 90 91 92 |
# File 'lib/sidekiq/superworker/subjob_processor.rb', line 88 def error(subjob, worker, item, exception) Superworker.debug "#{subjob.to_info}: Error" subjob.update_attribute(:status, 'failed') SuperjobProcessor.error(subjob.superjob_id, worker, item, exception) end |