Class: CanvasSync::JobBatches::Pool

Inherits:
Object
  • Object
show all
Includes:
RedisModel
Defined in:
lib/canvas_sync/job_batches/pool.rb

Constant Summary collapse

HINCR_MAX =
RedisScript.new(Pathname.new(__FILE__) + "../hincr_max.lua")

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RedisModel

#persist_bid_attr, #read_bid_attr

Constructor Details

#initialize(pooolid = nil, **kwargs) ⇒ Pool

Returns a new instance of Pool.



16
17
18
19
20
21
22
23
24
# File 'lib/canvas_sync/job_batches/pool.rb', line 16

def initialize(pooolid = nil, **kwargs)
  if pooolid
    @existing = true
    @pid = pooolid
  else
    @pid = SecureRandom.urlsafe_base64(10)
    initialize_new(**kwargs)
  end
end

Instance Attribute Details

#pidObject (readonly)

Returns the value of attribute pid.



8
9
10
# File 'lib/canvas_sync/job_batches/pool.rb', line 8

def pid
  @pid
end

Class Method Details

.from_pid(pid) ⇒ Object



26
27
28
29
# File 'lib/canvas_sync/job_batches/pool.rb', line 26

def self.from_pid(pid)
  raise "PID must be given" unless pid.present?
  new(pid)
end

.job_checked_in(status, options) ⇒ Object



102
103
104
105
# File 'lib/canvas_sync/job_batches/pool.rb', line 102

def self.job_checked_in(status, options)
  pid = options['pool_id']
  from_pid(pid).job_checked_in(status, options)
end

Instance Method Details

#<<(job_desc) ⇒ Object



31
32
33
# File 'lib/canvas_sync/job_batches/pool.rb', line 31

def <<(job_desc)
  add_job(job_desc)
end

#active_countObject



69
70
71
72
73
# File 'lib/canvas_sync/job_batches/pool.rb', line 69

def active_count
  redis do |r|
    r.hincrby(redis_key, "active_count", 0)
  end
end

#add_job(job_desc) ⇒ Object



35
36
37
# File 'lib/canvas_sync/job_batches/pool.rb', line 35

def add_job(job_desc)
  add_jobs([job_desc])
end

#add_jobs(job_descs) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/canvas_sync/job_batches/pool.rb', line 39

def add_jobs(job_descs)
  job_descs.each do |job_desc|
    wrapper = Batch.new
    wrapper.description = "Pool Job Wrapper (PID: #{pid})"
    checkin_event = (on_failed_job == :wait) ? :success : :complete
    wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid)
    wrapper.jobs {}

    job_desc = job_desc.with_indifferent_access
    job_desc = job_desc.merge!(
      job: job_desc[:job].to_s,
      pool_wrapper_batch: wrapper.bid,
    )

    push_job_to_pool(job_desc)
  end
  refill_allotment
end

#cleanup_redisObject



58
59
60
61
62
63
64
65
66
67
# File 'lib/canvas_sync/job_batches/pool.rb', line 58

def cleanup_redis
  Batch.logger.debug {"Cleaning redis of pool #{pid}"}
  redis do |r|
    r.zrem("pools", pid)
    r.unlink(
      "#{redis_key}",
      "#{redis_key}-jobs",
    )
  end
end

#job_checked_in(status, options) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/canvas_sync/job_batches/pool.rb', line 90

def job_checked_in(status, options)
  active_count = redis do |r|
    return unless r.exists?(redis_key)
    r.hincrby(redis_key, "active_count", -1)
  end

  added_count = refill_allotment
  if active_count == 0 && added_count == 0
    cleanup_redis if clean_when_empty
  end
end

#pending_countObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/canvas_sync/job_batches/pool.rb', line 75

def pending_count
  jobs_key = "#{redis_key}-jobs"
  order = self.order || 'fifo'
  redis do |r|
    case order.to_sym
    when :fifo, :lifo
      r.llen(jobs_key)
    when :random
      r.scard(jobs_key)
    when :priority
      r.zcard(jobs_key)
    end
  end
end