Module: Delayer::Extend

Defined in:
lib/delayer/extend.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#exceptionObject (readonly)

Returns the value of attribute exception.



40
41
42
# File 'lib/delayer/extend.rb', line 40

def exception
  @exception
end

#expireObject

Returns the value of attribute expire.



39
40
41
# File 'lib/delayer/extend.rb', line 39

def expire
  @expire
end

Class Method Details

.extended(klass) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/delayer/extend.rb', line 42

def self.extended(klass)
  klass.class_eval do
    @busy = false
    @expire = 0
    @remain_hook = nil
    @exception = nil
    @remain_received = false
    @lock = Mutex.new
    @bucket = Bucket.new(nil, nil, {}, nil)
  end
end

Instance Method Details

#busy?Boolean

Return if some jobs processing now.

Args

args

Return

true if Delayer processing job

Returns:

  • (Boolean)


103
104
105
# File 'lib/delayer/extend.rb', line 103

def busy?
  @busy
end

#empty?Boolean

Return true if no jobs has.

Return

true if no jobs has.

Returns:

  • (Boolean)


110
111
112
# File 'lib/delayer/extend.rb', line 110

def empty?
  !@bucket.first
end

#expire?Boolean

Returns:

  • (Boolean)


76
77
78
79
80
81
82
# File 'lib/delayer/extend.rb', line 76

def expire?
  if defined?(@end_time) and @end_time
    @end_time < Time.new.to_f
  else
    false
  end
end

#get_prev_point(priority) ⇒ Object



155
156
157
158
159
160
161
162
# File 'lib/delayer/extend.rb', line 155

def get_prev_point(priority)
  if @bucket.priority_of[priority]
    @bucket.priority_of[priority]
  else
    next_index = @priorities.index(priority) - 1
    get_prev_point @priorities[next_index] if 0 <= next_index
  end
end

#register(procedure) ⇒ Object

register new job.

Args

procedure

job(Delayer::Procedure)

Return

self



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/delayer/extend.rb', line 130

def register(procedure)
  priority = procedure.delayer.priority
  lock.synchronize do
    last_pointer = get_prev_point(priority)
    if last_pointer
      @bucket.priority_of[priority] = last_pointer.break procedure
    else
      procedure.next = @bucket.first
      @bucket.priority_of[priority] = @bucket.first = procedure
    end
    if @bucket.last
      @bucket.last = @bucket.priority_of[priority]
    end
    if @remain_hook and not @remain_received
      @remain_received = true
      @remain_hook.call
    end
  end
  self
end

#register_remain_hookObject



151
152
153
# File 'lib/delayer/extend.rb', line 151

def register_remain_hook
  @remain_hook = Proc.new
end

#run(current_expire = @expire) ⇒ Object

Run registered jobs.

Args

current_expire

expire for processing (secs, 0=unexpired)

Return

self



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/delayer/extend.rb', line 59

def run(current_expire = @expire)
  if 0 == current_expire
    run_once while not empty?
  else
    @end_time = Time.new.to_f + @expire
    run_once while not(empty?) and @end_time >= Time.new.to_f
    @end_time = nil
  end
  if @remain_hook
    @remain_received = !empty?
    @remain_hook.call if @remain_received  
  end
rescue Exception => e
  @exception = e
  raise e
end

#run_onceObject

Run a job and forward pointer.

Return

self



87
88
89
90
91
92
93
94
95
96
# File 'lib/delayer/extend.rb', line 87

def run_once
  if @bucket.first
    @busy = true
    procedure = forward
    procedure = forward while @bucket.first and procedure.canceled?
    procedure.run unless procedure.canceled?
  end
ensure
  @busy = false
end

#size(node = @bucket.first) ⇒ Object

Return remain jobs quantity.

Return

Count of remain jobs



117
118
119
120
121
122
123
# File 'lib/delayer/extend.rb', line 117

def size(node = @bucket.first)
  if node
    1 + size(node.next)
  else
    0
  end
end

#stash_enter!Object

DelayerのStashレベルをインクリメントする。 このメソッドが呼ばれたら、その時存在するジョブは退避され、stash_exit!が呼ばれるまで実行されない。



172
173
174
175
# File 'lib/delayer/extend.rb', line 172

def stash_enter!
  @bucket = Bucket.new(nil, nil, {}, @bucket)
  self
end

#stash_exit!Object

DelayerのStashレベルをデクリメントする。 このメソッドを呼ぶ前に、現在のレベルに存在するすべてのジョブを実行し、Delayer#empty?がtrueを返すような状態になっている必要がある。

Raises

Delayer::NoLowerLevelError

stash_enter!が呼ばれていない時

Delayer::RemainJobsError

ジョブが残っているのにこのメソッドを呼んだ時



182
183
184
185
186
# File 'lib/delayer/extend.rb', line 182

def stash_exit!
  raise Delayer::NoLowerLevelError, 'stash_exit! called in level 0.' if !@bucket.stashed
  raise Delayer::RemainJobsError, 'Current level has remain jobs. It must be empty current level jobs in call this method.' if !self.empty?
  @bucket = @bucket.stashed
end

#stash_levelObject

現在のDelayer Stashレベルを返す。



189
190
191
# File 'lib/delayer/extend.rb', line 189

def stash_level
  @bucket.stash_size
end

#validate_priority(symbol) ⇒ Object



164
165
166
167
168
# File 'lib/delayer/extend.rb', line 164

def validate_priority(symbol)
  unless @priorities.include? symbol
    raise Delayer::InvalidPriorityError, "undefined priority '#{symbol}'"
  end
end