Class: Utilrb::ThreadPool
Overview
ThreadPool implementation inspired by github.com/meh/ruby-threadpool
Defined Under Namespace
Classes: Task
Instance Attribute Summary collapse
-
#auto_trim ⇒ Boolean
Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work.
-
#avg_run_time ⇒ Float
readonly
The average execution time of a (running) task.
-
#avg_wait_time ⇒ Float
readonly
The average waiting time of a task before being executed.
-
#max ⇒ Fixnum
The maximum number of worker threads.
-
#min ⇒ Fixnum
The minimum number of worker threads.
-
#spawned ⇒ Fixnum
readonly
The real number of worker threads.
-
#waiting ⇒ Fixnum
readonly
The number of worker threads waiting for work.
Instance Method Summary collapse
-
#<<(task) ⇒ Task
Processes the given Task as soon as the next thread is available.
-
#backlog ⇒ Fixnum
Number of tasks waiting for execution.
- #clear ⇒ Object
-
#initialize(min = 5, max = min) ⇒ ThreadPool
constructor
A ThreadPool.
-
#join ⇒ Object
Blocks until all threads were terminated.
-
#on_task_finished {|Task| ... } ⇒ Object
Given code block is called for every task which was finished even it was terminated.
-
#process(*args) {|*args| ... } ⇒ Task
Processes the given block as soon as the next thread is available.
-
#process? ⇒ Boolean
Returns true if a worker thread is currently processing a task and no work is queued.
-
#process_with_options(options, *args, &block) ⇒ Task
Processes the given block as soon as the next thread is available with the given options.
-
#resize(min, max = nil) ⇒ Object
Changes the minimum and maximum number of threads.
-
#shutdown ⇒ Object
Shuts down all threads.
-
#shutdown? ⇒ boolean
Checks if the thread pool is shutting down all threads.
-
#sync(sync_key, *args) {|*args| ... } ⇒ Object
Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.
-
#sync_keys ⇒ Object
returns the current used sync_keys.
-
#tasks ⇒ Array<Task>
Returns an array of the current waiting and running tasks.
-
#trim(force = false) ⇒ Object
Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.
Constructor Details
#initialize(min = 5, max = min) ⇒ ThreadPool
A ThreadPool
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/utilrb/thread_pool.rb', line 292 def initialize (min = 5, max = min) @min = min @max = max @cond = ConditionVariable.new @cond_sync_key = ConditionVariable.new @mutex = Mutex.new @tasks_waiting = [] # tasks waiting for execution @tasks_running = [] # tasks which are currently running # Statistics @avg_run_time = 0 # average run time of a task in s [Float] @avg_wait_time = 0 # average time a task has to wait for execution in s [Float] @workers = [] # thread pool @spawned = 0 @waiting = 0 @shutdown = false @callback_on_task_finished = nil @pipes = nil @sync_keys = Set.new @trim_requests = 0 @auto_trim = false @mutex.synchronize do min.times do spawn_thread end end end |
Instance Attribute Details
#auto_trim ⇒ Boolean
Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work.
286 287 288 |
# File 'lib/utilrb/thread_pool.rb', line 286 def auto_trim @auto_trim end |
#avg_run_time ⇒ Float (readonly)
The average execution time of a (running) task.
276 277 278 |
# File 'lib/utilrb/thread_pool.rb', line 276 def avg_run_time @avg_run_time end |
#avg_wait_time ⇒ Float (readonly)
The average waiting time of a task before being executed.
281 282 283 |
# File 'lib/utilrb/thread_pool.rb', line 281 def avg_wait_time @avg_wait_time end |
#max ⇒ Fixnum
The maximum number of worker threads.
261 262 263 |
# File 'lib/utilrb/thread_pool.rb', line 261 def max @max end |
#min ⇒ Fixnum
The minimum number of worker threads.
256 257 258 |
# File 'lib/utilrb/thread_pool.rb', line 256 def min @min end |
#spawned ⇒ Fixnum (readonly)
The real number of worker threads.
266 267 268 |
# File 'lib/utilrb/thread_pool.rb', line 266 def spawned @spawned end |
#waiting ⇒ Fixnum (readonly)
The number of worker threads waiting for work.
271 272 273 |
# File 'lib/utilrb/thread_pool.rb', line 271 def waiting @waiting end |
Instance Method Details
#<<(task) ⇒ Task
Processes the given Task as soon as the next thread is available
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 |
# File 'lib/utilrb/thread_pool.rb', line 455 def <<(task) raise "cannot add task #{task} it is still running" if task.thread task.reset if task.finished? @mutex.synchronize do if shutdown? raise "unable to add work while shutting down" end task.queued_at = Time.now @tasks_waiting << task if @waiting == 0 && @spawned < @max spawn_thread end @cond.signal end task end |
#backlog ⇒ Fixnum
Number of tasks waiting for execution
374 375 376 377 378 |
# File 'lib/utilrb/thread_pool.rb', line 374 def backlog @mutex.synchronize do @tasks_waiting.length end end |
#clear ⇒ Object
342 343 344 345 346 347 348 |
# File 'lib/utilrb/thread_pool.rb', line 342 def clear shutdown join rescue Exception ensure @shutdown = false end |
#join ⇒ Object
Blocks until all threads were terminated. This does not terminate any thread by itself and will block for ever if shutdown was not called.
500 501 502 503 |
# File 'lib/utilrb/thread_pool.rb', line 500 def join @workers.first.join until @workers.empty? self end |
#on_task_finished {|Task| ... } ⇒ Object
Given code block is called for every task which was finished even it was terminated.
This can be used to store the result for an event loop
511 512 513 514 515 |
# File 'lib/utilrb/thread_pool.rb', line 511 def on_task_finished (&block) @mutex.synchronize do @callback_on_task_finished = block end end |
#process(*args) {|*args| ... } ⇒ Task
Processes the given block as soon as the next thread is available.
394 395 396 |
# File 'lib/utilrb/thread_pool.rb', line 394 def process (*args, &block) (nil,*args,&block) end |
#process? ⇒ Boolean
Returns true if a worker thread is currently processing a task and no work is queued
402 403 404 405 406 |
# File 'lib/utilrb/thread_pool.rb', line 402 def process? @mutex.synchronize do waiting != spawned || @tasks_waiting.length > 0 end end |
#process_with_options(options, *args, &block) ⇒ Task
Processes the given block as soon as the next thread is available with the given options.
414 415 416 417 418 |
# File 'lib/utilrb/thread_pool.rb', line 414 def (,*args, &block) task = Task.new(,*args, &block) self << task task end |
#resize(min, max = nil) ⇒ Object
Changes the minimum and maximum number of threads
359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/utilrb/thread_pool.rb', line 359 def resize (min, max = nil) @mutex.synchronize do @min = min @max = max || min count = [@tasks_waiting.size,@max].min 0.upto(count) do spawn_thread end end trim true end |
#shutdown ⇒ Object
Shuts down all threads.
488 489 490 491 492 493 494 |
# File 'lib/utilrb/thread_pool.rb', line 488 def shutdown() tasks = nil @mutex.synchronize do @shutdown = true end @cond.broadcast end |
#shutdown? ⇒ boolean
Checks if the thread pool is shutting down all threads.
353 |
# File 'lib/utilrb/thread_pool.rb', line 353 def shutdown?; @shutdown; end |
#sync(sync_key, *args) {|*args| ... } ⇒ Object
Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.
This is useful for instance member calls which are not thread safe.
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 |
# File 'lib/utilrb/thread_pool.rb', line 430 def sync(sync_key,*args,&block) raise ArgumentError,"no sync key" unless sync_key @mutex.synchronize do while(!@sync_keys.add?(sync_key)) @cond_sync_key.wait @mutex #wait until someone has removed a key end end begin result = block.call(*args) ensure @mutex.synchronize do @sync_keys.delete sync_key end @cond_sync_key.signal @cond.signal # worker threads are just waiting for work no matter if it is # because of a deletion of a sync_key or a task was added end result end |
#sync_keys ⇒ Object
returns the current used sync_keys
336 337 338 339 340 |
# File 'lib/utilrb/thread_pool.rb', line 336 def sync_keys @mutex.synchronize do @sync_keys.clone end end |
#tasks ⇒ Array<Task>
Returns an array of the current waiting and running tasks
383 384 385 386 387 |
# File 'lib/utilrb/thread_pool.rb', line 383 def tasks @mutex.synchronize do @tasks_running.dup + @tasks_waiting.dup end end |
#trim(force = false) ⇒ Object
Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.
476 477 478 479 480 481 482 483 484 |
# File 'lib/utilrb/thread_pool.rb', line 476 def trim (force = false) @mutex.synchronize do if (force || @waiting > 0) && @spawned - @trim_requests > @min @trim_requests += 1 @cond.signal end end self end |