Class: Utilrb::ThreadPool::Task
Overview
A Task is executed by the thread pool as soon as a free thread is available.
Constant Summary collapse
- Asked =
Class.new(Exception)
Instance Attribute Summary collapse
-
#description ⇒ Object
Custom description which can be used to store a human readable object.
-
#exception ⇒ Exception
readonly
The exception thrown by the custom code block.
-
#pool ⇒ ThreadPool
readonly
Thread pool the task belongs to.
-
#queued_at ⇒ Object
The time the task was queued .
-
#result ⇒ Object
readonly
Result of the code block call.
-
#started_at ⇒ Object
readonly
The time the task was started.
-
#state ⇒ :waiting, ...
readonly
State of the task.
-
#stopped_at ⇒ Object
readonly
The time the task was stopped or finished.
-
#sync_key ⇒ Object
readonly
The sync key is used to speifiy that a given task must not run in paralles with another task having the same sync key.
-
#thread ⇒ Object
readonly
The thread the task was assigned to.
Instance Method Summary collapse
-
#callback {|Object, Exception| ... } ⇒ Object
Called from the worker thread when the work is done.
-
#default? ⇒ Boolean
returns true if the task has a default return vale.
-
#exception? ⇒ Boolean
Checks if an exception occurred.
-
#execute ⇒ Object
Executes the task.
-
#finalize ⇒ Object
propagates the tasks state should be called after execute.
-
#finished? ⇒ Boolean
Checks if the task was stopped or finished.
-
#initialize(options = Hash.new, *args, &block) ⇒ Task
constructor
A new task which can be added to the work queue of a Utilrb::ThreadPool.
-
#pre_execute(pool = nil) ⇒ Object
sets all internal state to running call execute after that.
-
#reset ⇒ Object
Resets the tasks.
-
#running? ⇒ Boolean
Checks if the task is running.
-
#started? ⇒ Boolean
Checks if the task was started.
-
#stopping? ⇒ Boolean
Checks if the task is going to be stopped.
-
#successfull? ⇒ Boolean
Checks if the task was successfully finished.
-
#terminate!(exception = Asked) ⇒ Object
Terminates the task if it is running.
-
#terminated? ⇒ Boolean
Checks if the task was terminated.
-
#time_elapsed(time = Time.now) ⇒ Object
Returns the number of seconds the task is or was running at the given point in time.
Constructor Details
#initialize(options = Hash.new, *args, &block) ⇒ Task
A new task which can be added to the work queue of a Utilrb::ThreadPool. If a sync key is given no task having the same key will be executed in parallel which is useful for instance member calls which are not thread safe.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/utilrb/thread_pool.rb', line 127 def initialize ( = Hash.new,*args, &block) unless block raise ArgumentError, 'you must pass a work block to initialize a new Task.' end = Kernel.(,{:sync_key => nil,:default => nil,:callback => nil}) @sync_key = [:sync_key] @arguments = args @default = [:default] @callback = [:callback] @block = block @mutex = Mutex.new @pool = nil @state_temp = nil @state = nil reset end |
Instance Attribute Details
#description ⇒ Object
Custom description which can be used to store a human readable object
76 77 78 |
# File 'lib/utilrb/thread_pool.rb', line 76 def description @description end |
#exception ⇒ Exception (readonly)
The exception thrown by the custom code block
49 50 51 |
# File 'lib/utilrb/thread_pool.rb', line 49 def exception @exception end |
#pool ⇒ ThreadPool (readonly)
Thread pool the task belongs to
39 40 41 |
# File 'lib/utilrb/thread_pool.rb', line 39 def pool @pool end |
#queued_at ⇒ Object
The time the task was queued
return [Time] the time
59 60 61 |
# File 'lib/utilrb/thread_pool.rb', line 59 def queued_at @queued_at end |
#result ⇒ Object (readonly)
Result of the code block call
72 73 74 |
# File 'lib/utilrb/thread_pool.rb', line 72 def result @result end |
#started_at ⇒ Object (readonly)
The time the task was started
return [Time] the time
64 65 66 |
# File 'lib/utilrb/thread_pool.rb', line 64 def started_at @started_at end |
#state ⇒ :waiting, ... (readonly)
State of the task
44 45 46 |
# File 'lib/utilrb/thread_pool.rb', line 44 def state @state end |
#stopped_at ⇒ Object (readonly)
The time the task was stopped or finished
return [Time] the time
69 70 71 |
# File 'lib/utilrb/thread_pool.rb', line 69 def stopped_at @stopped_at end |
#sync_key ⇒ Object (readonly)
The sync key is used to speifiy that a given task must not run in paralles with another task having the same sync key. If no key is set there are no such constrains for the taks.
34 35 36 |
# File 'lib/utilrb/thread_pool.rb', line 34 def sync_key @sync_key end |
#thread ⇒ Object (readonly)
The thread the task was assigned to
return [Thread] the thread
54 55 56 |
# File 'lib/utilrb/thread_pool.rb', line 54 def thread @thread end |
Instance Method Details
#callback {|Object, Exception| ... } ⇒ Object
Called from the worker thread when the work is done
230 231 232 233 234 |
# File 'lib/utilrb/thread_pool.rb', line 230 def callback(&block) @mutex.synchronize do @callback = block end end |
#default? ⇒ Boolean
returns true if the task has a default return vale
163 164 165 166 167 |
# File 'lib/utilrb/thread_pool.rb', line 163 def default? @mutex.synchronize do @default != nil end end |
#exception? ⇒ Boolean
Checks if an exception occurred.
114 |
# File 'lib/utilrb/thread_pool.rb', line 114 def exception?; @state == :exception; end |
#execute ⇒ Object
Executes the task. Should be called from a worker thread after pre_execute was called. After execute returned and the task was deleted from any internal list finalize must be called to propagate the task state.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/utilrb/thread_pool.rb', line 187 def execute() raise RuntimeError, "call pre_execute ThreadPool::Task first. Current state is #{@state} but :running was expected" if @state != :running @state_temp = begin @result = @block.call(*@arguments) :finished rescue Exception => e @exception = e if e.is_a? Asked :terminated else :exception end end @stopped_at = Time.now end |
#finalize ⇒ Object
propagates the tasks state should be called after execute
205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/utilrb/thread_pool.rb', line 205 def finalize @mutex.synchronize do @thread = nil @state = @state_temp @pool = nil end begin @callback.call @result,@exception if @callback rescue Exception => e ThreadPool.report_exception("thread_pool: in #{self}, callback #{@callback} failed", e) end end |
#finished? ⇒ Boolean
Checks if the task was stopped or finished. This also includes cases where an exception was raised by the custom code block.
98 |
# File 'lib/utilrb/thread_pool.rb', line 98 def finished?; started? && !running? && !stopping?; end |
#pre_execute(pool = nil) ⇒ Object
sets all internal state to running call execute after that.
171 172 173 174 175 176 177 178 179 180 |
# File 'lib/utilrb/thread_pool.rb', line 171 def pre_execute(pool=nil) @mutex.synchronize do #store current thread to be able to terminate #the thread @pool = pool @thread = Thread.current @started_at = Time.now @state = :running end end |
#reset ⇒ Object
Resets the tasks. This can be used to requeue a task that is already finished
146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/utilrb/thread_pool.rb', line 146 def reset if finished? || !started? @mutex.synchronize do @result = @default @state = :waiting @exception = nil @started_at = nil @queued_at = nil @stopped_at = nil end else raise RuntimeError,"cannot reset a task which is not finished" end end |
#running? ⇒ Boolean
Checks if the task is running
86 |
# File 'lib/utilrb/thread_pool.rb', line 86 def running?; @state == :running; end |
#started? ⇒ Boolean
Checks if the task was started
81 |
# File 'lib/utilrb/thread_pool.rb', line 81 def started?; @state != :waiting; end |
#stopping? ⇒ Boolean
Checks if the task is going to be stopped
91 |
# File 'lib/utilrb/thread_pool.rb', line 91 def stopping?; @state == :stopping; end |
#successfull? ⇒ Boolean
Checks if the task was successfully finished. This means no exceptions, termination or timed out occurred
104 |
# File 'lib/utilrb/thread_pool.rb', line 104 def successfull?; @state == :finished; end |
#terminate!(exception = Asked) ⇒ Object
Terminates the task if it is running
219 220 221 222 223 224 225 |
# File 'lib/utilrb/thread_pool.rb', line 219 def terminate!(exception = Asked) @mutex.synchronize do return unless running? @state = :stopping @thread.raise exception end end |
#terminated? ⇒ Boolean
Checks if the task was terminated.
109 |
# File 'lib/utilrb/thread_pool.rb', line 109 def terminated?; @state == :terminated; end |
#time_elapsed(time = Time.now) ⇒ Object
Returns the number of seconds the task is or was running at the given point in time
241 242 243 244 245 246 247 248 249 250 |
# File 'lib/utilrb/thread_pool.rb', line 241 def time_elapsed(time = Time.now) #no need to synchronize here if running? (time-@started_at).to_f elsif finished? (@stopped_at-@started_at).to_f else 0 end end |