Class: CanvasSync::JobBatches::ManagedBatchJob

Inherits:
BaseJob
  • Object
show all
Defined in:
lib/canvas_sync/job_batches/jobs/managed_batch_job.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cleanup_redis(status, options) ⇒ Object



49
50
51
52
53
54
55
56
57
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 49

def self.cleanup_redis(status, options)
  man_batch_id = options['managed_batch_id']
  Batch.redis do |r|
    r.del(
      "MNGBID-#{man_batch_id}",
      "MNGBID-#{man_batch_id}-jobs",
    )
  end
end

.job_succeeded_callback(status, options) ⇒ Object



59
60
61
62
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 59

def self.job_succeeded_callback(status, options)
  man_batch_id = options['managed_batch_id']
  perform_next_sequence_job(man_batch_id)
end

Instance Method Details

#perform(sub_jobs, context: nil, ordered: true, concurrency: nil) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 6

def perform(sub_jobs, context: nil, ordered: true, concurrency: nil)
  man_batch_id = SecureRandom.urlsafe_base64(10)

  if concurrency == 0 || concurrency == nil || concurrency == true
    concurrency = sub_jobs.count
  elsif concurrency == false
    concurrency = 1
  end

  root_batch = Batch.new

  Batch.redis do |r|
    r.multi do
      r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid)
      r.hset("MNGBID-#{man_batch_id}", "ordered", ordered)
      r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency)
      r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL)

      mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i|
        j['_mngbid_index_'] = i # This allows duplicate jobs when a Redis Set is used
        j = ActiveJob::Arguments.serialize([j])
        JSON.unparse(j)
      end
      if ordered
        r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
      else
        r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
      end
      r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL)
    end
  end

  root_batch.description = "Managed Batch Root (#{man_batch_id})"
  root_batch.allow_context_changes = (concurrency == 1)
  root_batch.context = context
  root_batch.on(:success, "#{self.class.to_s}.cleanup_redis", managed_batch_id: man_batch_id)
  root_batch.jobs {}

  concurrency.times do
    self.class.perform_next_sequence_job(man_batch_id)
  end
end