Module: Delayer::Extend
- Defined in:
- lib/delayer/extend.rb
Instance Attribute Summary collapse
-
#exception ⇒ Object
readonly
Returns the value of attribute exception.
-
#expire ⇒ Object
Returns the value of attribute expire.
Class Method Summary collapse
Instance Method Summary collapse
-
#busy? ⇒ Boolean
Return if some jobs processing now.
-
#empty? ⇒ Boolean
Return true if no jobs has.
- #expire? ⇒ Boolean
- #get_prev_point(priority) ⇒ Object
- #pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)) ⇒ Object
-
#register(procedure) ⇒ Object
register new job.
- #register_remain_hook(&proc) ⇒ Object
-
#reserve(procedure) ⇒ Object
Register reserved job.
-
#run(current_expire = @expire) ⇒ Object
Run registered jobs.
-
#run_once ⇒ Object
Run a job and forward pointer.
-
#size(node = @bucket.first) ⇒ Object
Return remain jobs quantity.
-
#stash_enter! ⇒ Object
DelayerのStashレベルをインクリメントする。 このメソッドが呼ばれたら、その時存在するジョブは退避され、stash_exit!が呼ばれるまで実行されない。.
-
#stash_exit! ⇒ Object
DelayerのStashレベルをデクリメントする。 このメソッドを呼ぶ前に、現在のレベルに存在するすべてのジョブを実行し、Delayer#empty?がtrueを返すような状態になっている必要がある。 ==== Raises [Delayer::NoLowerLevelError] stash_enter!が呼ばれていない時 [Delayer::RemainJobsError] ジョブが残っているのにこのメソッドを呼んだ時.
-
#stash_level ⇒ Object
現在のDelayer Stashレベルを返す。.
- #validate_priority(symbol) ⇒ Object
Instance Attribute Details
#exception ⇒ Object (readonly)
Returns the value of attribute exception.
56 57 58 |
# File 'lib/delayer/extend.rb', line 56 def exception @exception end |
#expire ⇒ Object
Returns the value of attribute expire.
55 56 57 |
# File 'lib/delayer/extend.rb', line 55 def expire @expire end |
Class Method Details
.extended(klass) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/delayer/extend.rb', line 58 def self.extended(klass) klass.class_eval do @busy = false @expire = 0 @remain_hook = nil @exception = nil @remain_received = false @lock = Monitor.new @bucket = Bucket.new(nil, nil, {}, nil) @last_reserve = nil @reserves = Set.new end end |
Instance Method Details
#busy? ⇒ Boolean
Return if some jobs processing now.
Args
- args
-
Return
true if Delayer processing job
138 139 140 |
# File 'lib/delayer/extend.rb', line 138 def busy? @busy end |
#empty? ⇒ Boolean
Return true if no jobs has.
Return
true if no jobs has.
145 146 147 |
# File 'lib/delayer/extend.rb', line 145 def empty? !@bucket.first end |
#expire? ⇒ Boolean
108 109 110 |
# File 'lib/delayer/extend.rb', line 108 def expire? !!@end_time&.<(Time.new.to_f) end |
#get_prev_point(priority) ⇒ Object
211 212 213 214 215 216 217 218 219 220 |
# File 'lib/delayer/extend.rb', line 211 def get_prev_point(priority) if @bucket.priority_of[priority] @bucket.priority_of[priority] else @priorities.index(priority)&.yield_self do |index| next_index = index - 1 get_prev_point @priorities[next_index] if next_index >= 0 end end end |
#pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/delayer/extend.rb', line 72 def pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)) if @last_reserve&.reserve_at&.<=(start_time) lock.synchronize do while @last_reserve&.reserve_at&.<=(start_time) @last_reserve.register @last_reserve = @reserves.min @reserves.delete(@last_reserve) end end end end |
#register(procedure) ⇒ Object
register new job.
Args
- procedure
-
job(Delayer::Procedure)
Return
self
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/delayer/extend.rb', line 165 def register(procedure) priority = procedure.delayer.priority lock.synchronize do last_pointer = get_prev_point(priority) if last_pointer @bucket.priority_of[priority] = last_pointer.break procedure else procedure.next = @bucket.first @bucket.priority_of[priority] = @bucket.first = procedure end @bucket.last = @bucket.priority_of[priority] if @bucket.last if @remain_hook && !@remain_received @remain_received = true @remain_hook.call end end self end |
#register_remain_hook(&proc) ⇒ Object
207 208 209 |
# File 'lib/delayer/extend.rb', line 207 def register_remain_hook(&proc) @remain_hook = proc end |
#reserve(procedure) ⇒ Object
Register reserved job. It does not execute immediately. it calls register() in procedure.reserve_at.
Args
- procedure
-
job(Delayer::DelayedProcedure)
Return
self
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/delayer/extend.rb', line 191 def reserve(procedure) lock.synchronize do if @last_reserve if @last_reserve > procedure @reserves.add(@last_reserve) @last_reserve = procedure else @reserves.add(procedure) end else @last_reserve = procedure end end self end |
#run(current_expire = @expire) ⇒ Object
Run registered jobs.
Args
- current_expire
-
expire for processing (secs, 0=unexpired)
Return
self
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/delayer/extend.rb', line 89 def run(current_expire = @expire) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC).to_f pop_reserve(start_time) if current_expire == 0 run_once_without_pop_reserve until empty? else @end_time = end_time = start_time + @expire run_once_without_pop_reserve while !empty? && (end_time >= Process.clock_gettime(Process::CLOCK_MONOTONIC)) @end_time = nil end if @remain_hook @remain_received = !empty? @remain_hook.call if @remain_received end rescue Exception => e @exception = e raise e end |
#run_once ⇒ Object
Run a job and forward pointer.
Return
self
115 116 117 118 |
# File 'lib/delayer/extend.rb', line 115 def run_once pop_reserve run_once_without_pop_reserve end |
#size(node = @bucket.first) ⇒ Object
Return remain jobs quantity.
Return
Count of remain jobs
152 153 154 155 156 157 158 |
# File 'lib/delayer/extend.rb', line 152 def size(node = @bucket.first) if node 1 + size(node.next) else 0 end end |
#stash_enter! ⇒ Object
DelayerのStashレベルをインクリメントする。このメソッドが呼ばれたら、その時存在するジョブは退避され、stash_exit!が呼ばれるまで実行されない。
230 231 232 233 |
# File 'lib/delayer/extend.rb', line 230 def stash_enter! @bucket = Bucket.new(nil, nil, {}, @bucket) self end |
#stash_exit! ⇒ Object
DelayerのStashレベルをデクリメントする。このメソッドを呼ぶ前に、現在のレベルに存在するすべてのジョブを実行し、Delayer#empty?がtrueを返すような状態になっている必要がある。
Raises
- Delayer::NoLowerLevelError
-
stash_enter!が呼ばれていない時
- Delayer::RemainJobsError
-
ジョブが残っているのにこのメソッドを呼んだ時
240 241 242 243 244 245 246 |
# File 'lib/delayer/extend.rb', line 240 def stash_exit! stashed = @bucket.stashed raise Delayer::NoLowerLevelError, 'stash_exit! called in level 0.' unless stashed raise Delayer::RemainJobsError, 'Current level has remain jobs. It must be empty current level jobs in call this method.' unless empty? @bucket = stashed end |
#stash_level ⇒ Object
現在のDelayer Stashレベルを返す。
249 250 251 |
# File 'lib/delayer/extend.rb', line 249 def stash_level @bucket.stash_size end |
#validate_priority(symbol) ⇒ Object
222 223 224 225 226 |
# File 'lib/delayer/extend.rb', line 222 def validate_priority(symbol) unless @priorities.include? symbol raise Delayer::InvalidPriorityError, "undefined priority '#{symbol}'" end end |