Module: Delayer::Extend

Defined in:
lib/delayer/extend.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#exceptionObject (readonly)

Returns the value of attribute exception.



56
57
58
# File 'lib/delayer/extend.rb', line 56

def exception
  @exception
end

#expireObject

Returns the value of attribute expire.



55
56
57
# File 'lib/delayer/extend.rb', line 55

def expire
  @expire
end

Class Method Details

.extended(klass) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/delayer/extend.rb', line 58

def self.extended(klass)
  klass.class_eval do
    @busy = false
    @expire = 0
    @remain_hook = nil
    @exception = nil
    @remain_received = false
    @lock = Monitor.new
    @bucket = Bucket.new(nil, nil, {}, nil)
    @last_reserve = nil
    @reserves = Set.new
  end
end

Instance Method Details

#busy?Boolean

Return if some jobs processing now.

Args

args

Return

true if Delayer processing job

Returns:

  • (Boolean)


138
139
140
# File 'lib/delayer/extend.rb', line 138

def busy?
  @busy
end

#empty?Boolean

Return true if no jobs has.

Return

true if no jobs has.

Returns:

  • (Boolean)


145
146
147
# File 'lib/delayer/extend.rb', line 145

def empty?
  !@bucket.first
end

#expire?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/delayer/extend.rb', line 108

def expire?
  !!@end_time&.<(Time.new.to_f)
end

#get_prev_point(priority) ⇒ Object



211
212
213
214
215
216
217
218
219
220
# File 'lib/delayer/extend.rb', line 211

def get_prev_point(priority)
  if @bucket.priority_of[priority]
    @bucket.priority_of[priority]
  else
    @priorities.index(priority)&.yield_self do |index|
      next_index = index - 1
      get_prev_point @priorities[next_index] if next_index >= 0
    end
  end
end

#pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/delayer/extend.rb', line 72

def pop_reserve(start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC))
  if @last_reserve&.reserve_at&.<=(start_time)
    lock.synchronize do
      while @last_reserve&.reserve_at&.<=(start_time)
        @last_reserve.register
        @last_reserve = @reserves.min
        @reserves.delete(@last_reserve)
      end
    end
  end
end

#register(procedure) ⇒ Object

register new job.

Args

procedure

job(Delayer::Procedure)

Return

self



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/delayer/extend.rb', line 165

def register(procedure)
  priority = procedure.delayer.priority
  lock.synchronize do
    last_pointer = get_prev_point(priority)
    if last_pointer
      @bucket.priority_of[priority] = last_pointer.break procedure
    else
      procedure.next = @bucket.first
      @bucket.priority_of[priority] = @bucket.first = procedure
    end
    @bucket.last = @bucket.priority_of[priority] if @bucket.last
    if @remain_hook && !@remain_received
      @remain_received = true
      @remain_hook.call
    end
  end
  self
end

#register_remain_hook(&proc) ⇒ Object



207
208
209
# File 'lib/delayer/extend.rb', line 207

def register_remain_hook(&proc)
  @remain_hook = proc
end

#reserve(procedure) ⇒ Object

Register reserved job. It does not execute immediately. it calls register() in procedure.reserve_at.

Args

procedure

job(Delayer::DelayedProcedure)

Return

self



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/delayer/extend.rb', line 191

def reserve(procedure)
  lock.synchronize do
    if @last_reserve
      if @last_reserve > procedure
        @reserves.add(@last_reserve)
        @last_reserve = procedure
      else
        @reserves.add(procedure)
      end
    else
      @last_reserve = procedure
    end
  end
  self
end

#run(current_expire = @expire) ⇒ Object

Run registered jobs.

Args

current_expire

expire for processing (secs, 0=unexpired)

Return

self



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/delayer/extend.rb', line 89

def run(current_expire = @expire)
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC).to_f
  pop_reserve(start_time)
  if current_expire == 0
    run_once_without_pop_reserve until empty?
  else
    @end_time = end_time = start_time + @expire
    run_once_without_pop_reserve while !empty? && (end_time >= Process.clock_gettime(Process::CLOCK_MONOTONIC))
    @end_time = nil
  end
  if @remain_hook
    @remain_received = !empty?
    @remain_hook.call if @remain_received
  end
rescue Exception => e
  @exception = e
  raise e
end

#run_onceObject

Run a job and forward pointer.

Return

self



115
116
117
118
# File 'lib/delayer/extend.rb', line 115

def run_once
  pop_reserve
  run_once_without_pop_reserve
end

#size(node = @bucket.first) ⇒ Object

Return remain jobs quantity.

Return

Count of remain jobs



152
153
154
155
156
157
158
# File 'lib/delayer/extend.rb', line 152

def size(node = @bucket.first)
  if node
    1 + size(node.next)
  else
    0
  end
end

#stash_enter!Object

DelayerのStashレベルをインクリメントする。このメソッドが呼ばれたら、その時存在するジョブは退避され、stash_exit!が呼ばれるまで実行されない。



230
231
232
233
# File 'lib/delayer/extend.rb', line 230

def stash_enter!
  @bucket = Bucket.new(nil, nil, {}, @bucket)
  self
end

#stash_exit!Object

DelayerのStashレベルをデクリメントする。このメソッドを呼ぶ前に、現在のレベルに存在するすべてのジョブを実行し、Delayer#empty?がtrueを返すような状態になっている必要がある。

Raises

Delayer::NoLowerLevelError

stash_enter!が呼ばれていない時

Delayer::RemainJobsError

ジョブが残っているのにこのメソッドを呼んだ時



240
241
242
243
244
245
246
# File 'lib/delayer/extend.rb', line 240

def stash_exit!
  stashed = @bucket.stashed
  raise Delayer::NoLowerLevelError, 'stash_exit! called in level 0.' unless stashed
  raise Delayer::RemainJobsError, 'Current level has remain jobs. It must be empty current level jobs in call this method.' unless empty?

  @bucket = stashed
end

#stash_levelObject

現在のDelayer Stashレベルを返す。



249
250
251
# File 'lib/delayer/extend.rb', line 249

def stash_level
  @bucket.stash_size
end

#validate_priority(symbol) ⇒ Object



222
223
224
225
226
# File 'lib/delayer/extend.rb', line 222

def validate_priority(symbol)
  unless @priorities.include? symbol
    raise Delayer::InvalidPriorityError, "undefined priority '#{symbol}'"
  end
end