Class: Sidekiq::Superworker::SuperjobProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/superworker/superjob_processor.rb

Class Method Summary collapse

Class Method Details

.complete(superjob_id) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/sidekiq/superworker/superjob_processor.rb', line 33

def self.complete(superjob_id)
  Superworker.debug "Superworker ##{superjob_id}: Complete"

  Subjob.delete_subjobs_for(superjob_id) if Superworker.options[:delete_subjobs_after_superjob_completes]
  
  # Set the superjob Sidekiq::Monitor::Job as being complete
  if defined?(Sidekiq::Monitor)
    job = Sidekiq::Monitor::Job.where(queue: queue_name, jid: superjob_id).first
    if job
      job.update_attributes(
        status: 'complete',
        finished_at: Time.now
      )
    end
  end
end

.create(superjob_id, superworker_class_name, args, subjobs, options = {}) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/sidekiq/superworker/superjob_processor.rb', line 8

def self.create(superjob_id, superworker_class_name, args, subjobs, options={})
  Superworker.debug "Superworker ##{superjob_id}: Create"
  
  options ||= {}
  
  # If sidekiq_monitor is being used, create a Sidekiq::Monitor::Job for the superjob
  if defined?(Sidekiq::Monitor)
    now = Time.now
    Sidekiq::Monitor::Job.create(
      jid: superjob_id,
      queue: queue_name,
      class_name: superworker_class_name,
      args: args,
      enqueued_at: now,
      started_at: now,
      status: 'running',
      name: options[:name]
    )
  end

  # Enqueue the first root-level subjob
  first_subjob = subjobs.find { |subjob| subjob.parent_id.nil? }
  SubjobProcessor.enqueue(first_subjob)
end

.error(superjob_id, worker, item, exception) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sidekiq/superworker/superjob_processor.rb', line 50

def self.error(superjob_id, worker, item, exception)
  Superworker.debug "Superworker ##{superjob_id}: Error"

  if defined?(Sidekiq::Monitor)
    job = Sidekiq::Monitor::Job.where(queue: queue_name, jid: superjob_id).first
    if job
      result = {
        message: "#{exception.message} (thrown in #{worker.class}, JID: #{item['jid']})",
        backtrace: exception.backtrace
      }
      job.update_attributes(
        finished_at: DateTime.now,
        status: 'failed',
        result: result
      )
    end
  end
end

.queue_nameObject



4
5
6
# File 'lib/sidekiq/superworker/superjob_processor.rb', line 4

def self.queue_name
  :superworker
end