Class: CanvasSync::JobBatches::Pool

Inherits:
Object
  • Object
show all
Includes:
RedisModel
Defined in:
lib/canvas_sync/job_batches/pool.rb

Constant Summary collapse

HINCR_MAX =
RedisScript.new(Pathname.new(__FILE__) + "../hincr_max.lua")

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RedisModel

#persist_bid_attr, #read_bid_attr

Constructor Details

#initialize(pooolid = nil, **kwargs) ⇒ Pool

Returns a new instance of Pool.



16
17
18
19
20
21
22
23
24
# File 'lib/canvas_sync/job_batches/pool.rb', line 16

def initialize(pooolid = nil, **kwargs)
  if pooolid
    @existing = true
    @pid = pooolid
  else
    @pid = SecureRandom.urlsafe_base64(10)
    initialize_new(**kwargs)
  end
end

Instance Attribute Details

#pidObject (readonly)

Returns the value of attribute pid.



8
9
10
# File 'lib/canvas_sync/job_batches/pool.rb', line 8

def pid
  @pid
end

Class Method Details

.from_pid(pid) ⇒ Object



26
27
28
29
# File 'lib/canvas_sync/job_batches/pool.rb', line 26

def self.from_pid(pid)
  raise "PID must be given" unless pid.present?
  new(pid)
end

.job_checked_in(status, options) ⇒ Object



131
132
133
134
# File 'lib/canvas_sync/job_batches/pool.rb', line 131

def self.job_checked_in(status, options)
  pid = options['pool_id']
  from_pid(pid).job_checked_in(status, options)
end

Instance Method Details

#<<(job_desc) ⇒ Object



31
32
33
# File 'lib/canvas_sync/job_batches/pool.rb', line 31

def <<(job_desc)
  add_job(job_desc)
end

#active_countObject



93
94
95
# File 'lib/canvas_sync/job_batches/pool.rb', line 93

def active_count
  redis.hincrby(redis_key, "active_count", 0)
end

#add_job(job_desc) ⇒ Object



35
36
37
# File 'lib/canvas_sync/job_batches/pool.rb', line 35

def add_job(job_desc)
  add_jobs([job_desc])
end

#add_jobs(job_descs) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/canvas_sync/job_batches/pool.rb', line 39

def add_jobs(job_descs)
  job_descs.each do |job_desc|
    wrapper = Batch.new
    wrapper.description = "Pool Job Wrapper (PID: #{pid})"
    checkin_event = (on_failed_job == :wait) ? :success : :complete
    wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid)
    wrapper.jobs {}

    job_desc = job_desc.symbolize_keys
    job_desc = job_desc.merge!(
      job: job_desc[:job].to_s,
      pool_wrapper_batch: wrapper.bid,
    )

    push_job_to_pool(job_desc)
  end
  refill_allotment
end

#cleanup_redisObject



82
83
84
85
86
87
88
89
90
91
# File 'lib/canvas_sync/job_batches/pool.rb', line 82

def cleanup_redis
  Batch.logger.debug {"Cleaning redis of pool #{pid}"}
  redis do |r|
    r.zrem("pools", pid)
    r.unlink(
      "#{redis_key}",
      "#{redis_key}-jobs",
    )
  end
end

#job_checked_in(status, options) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/canvas_sync/job_batches/pool.rb', line 110

def job_checked_in(status, options)
  active_count, pending_count = redis do |r|
    return unless r.exists?(redis_key)

    # Make sure this is loaded outside of the pipeline
    self.order

    redis.multi do |r|
      r.hincrby(redis_key, "active_count", -1)
      self.pending_count(r)
    end
  end

  added_count = refill_allotment
  if active_count == 0 && added_count == 0 && pending_count == 0
    if clean_when_empty && redis.hget(redis_key, 'keep_open') != 'true'
      cleanup_redis
    end
  end
end

#keep_open!Object



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/canvas_sync/job_batches/pool.rb', line 58

def keep_open!
  if block_given?
    begin
      keep_open!
      yield
    ensure
      let_close!
    end
  else
    redis.hset(redis_key, 'keep_open', true)
  end
end

#let_close!Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/canvas_sync/job_batches/pool.rb', line 71

def let_close!
  _, active_count = redis.multi do |r|
    r.hset(redis_key, 'keep_open', false)
    r.hincrby(redis_key, "active_count", 0)
  end

  if active_count == 0 && pending_count == 0
    cleanup_redis if clean_when_empty
  end
end

#pending_count(r = redis) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/canvas_sync/job_batches/pool.rb', line 97

def pending_count(r = redis)
  jobs_key = "#{redis_key}-jobs"
  order = self.order || 'fifo'
  case order.to_sym
  when :fifo, :lifo
    r.llen(jobs_key)
  when :random
    r.scard(jobs_key)
  when :priority
    r.zcard(jobs_key)
  end
end