Class: CanvasSync::JobBatches::ManagedBatchJob

Inherits:
BaseJob
  • Object
show all
Defined in:
lib/canvas_sync/job_batches/jobs/managed_batch_job.rb

Defined Under Namespace

Classes: ManagedBatchProxy

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cleanup_redis(status, options) ⇒ Object



71
72
73
74
75
76
77
78
79
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 71

def self.cleanup_redis(status, options)
  man_batch_id = options['managed_batch_id']
  Batch.redis do |r|
    r.del(
      "MNGBID-#{man_batch_id}",
      "MNGBID-#{man_batch_id}-jobs",
    )
  end
end

.job_succeeded_callback(status, options) ⇒ Object



81
82
83
84
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 81

def self.job_succeeded_callback(status, options)
  man_batch_id = options['managed_batch_id']
  perform_next_sequence_job(man_batch_id)
end

.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, desc_prefix: nil, &blk) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 6

def self.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, desc_prefix: nil, &blk)
  desc_prefix ||= ''

  if concurrency == 0 || concurrency == nil || concurrency == true
    concurrency = sub_jobs.count
  elsif concurrency == false
    concurrency = 1
  end

  root_batch = Batch.new

  if concurrency < sub_jobs.count
    man_batch_id = SecureRandom.urlsafe_base64(10)

    Batch.redis do |r|
      r.multi do |r|
        r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid)
        r.hset("MNGBID-#{man_batch_id}", "ordered", ordered)
        r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency)
        r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL)

        mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i|
          j['_mngbid_index_'] = i # This allows duplicate jobs when a Redis Set is used
          j = ::ActiveJob::Arguments.serialize([j])
          JSON.unparse(j)
        end
        if ordered
          r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
        else
          r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
        end
        r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL)
      end
    end

    root_batch.allow_context_changes = (concurrency == 1)
    root_batch.on(:success, "#{to_s}.cleanup_redis", managed_batch_id: man_batch_id)

    desc_prefix = "MGD(#{man_batch_id}): #{desc_prefix}"
  end

  root_batch.context = context

  blk.call(ManagedBatchProxy.new(root_batch)) if blk.present?

  root_batch.description = "#{desc_prefix}: #{root_batch.description || 'Root'}"

  if concurrency < sub_jobs.count
    root_batch.jobs {}
    concurrency.times do
      perform_next_sequence_job(man_batch_id)
    end
  else
    root_batch.jobs do
      sub_jobs.each do |j|
        ChainBuilder.enqueue_job(j)
      end
    end
  end
end

Instance Method Details

#perform(sub_jobs, context: nil, ordered: true, concurrency: nil) ⇒ Object



67
68
69
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 67

def perform(sub_jobs, context: nil, ordered: true, concurrency: nil)
  self.class.make_batch(sub_jobs, ordered: ordered, concurrency: concurrency, context: context)
end