Class: Utilrb::ThreadPool
Overview
ThreadPool implementation inspired by github.com/meh/ruby-threadpool
Defined Under Namespace
Classes: Task
Instance Attribute Summary collapse
-
#auto_trim ⇒ Boolean
Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work.
-
#avg_run_time ⇒ Float
readonly
The average execution time of a (running) task.
-
#avg_wait_time ⇒ Float
readonly
The average waiting time of a task before being executed.
-
#max ⇒ Fixnum
The maximum number of worker threads.
-
#min ⇒ Fixnum
The minimum number of worker threads.
-
#spawned ⇒ Fixnum
readonly
The real number of worker threads.
-
#waiting ⇒ Fixnum
readonly
The number of worker threads waiting for work.
Instance Method Summary collapse
-
#<<(task) ⇒ Task
Processes the given Task as soon as the next thread is available.
-
#backlog ⇒ Fixnum
Number of tasks waiting for execution.
- #clear ⇒ Object
-
#initialize(min = 5, max = min) ⇒ ThreadPool
constructor
A ThreadPool.
-
#join ⇒ Object
Blocks until all threads were terminated.
-
#on_task_finished {|Task| ... } ⇒ Object
Given code block is called for every task which was finished even it was terminated.
-
#process(*args) {|*args| ... } ⇒ Task
Processes the given block as soon as the next thread is available.
-
#process? ⇒ Boolean
Returns true if a worker thread is currently processing a task and no work is queued.
-
#process_with_options(options, *args, &block) ⇒ Task
Processes the given block as soon as the next thread is available with the given options.
-
#resize(min, max = nil) ⇒ Object
Changes the minimum and maximum number of threads.
-
#shutdown ⇒ Object
Shuts down all threads.
-
#shutdown? ⇒ boolean
Checks if the thread pool is shutting down all threads.
-
#sync(sync_key, *args) {|*args| ... } ⇒ Object
Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.
-
#sync_keys ⇒ Object
returns the current used sync_keys.
-
#sync_timeout(sync_key, timeout, *args) {|*args| ... } ⇒ Object
Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.
-
#tasks ⇒ Array<Task>
Returns an array of the current waiting and running tasks.
-
#trim(force = false) ⇒ Object
Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.
Constructor Details
#initialize(min = 5, max = min) ⇒ ThreadPool
A ThreadPool
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'lib/utilrb/thread_pool.rb', line 293 def initialize (min = 5, max = min) @min = min @max = max @cond = ConditionVariable.new @cond_sync_key = ConditionVariable.new @mutex = Mutex.new @tasks_waiting = [] # tasks waiting for execution @tasks_running = [] # tasks which are currently running # Statistics @avg_run_time = 0 # average run time of a task in s [Float] @avg_wait_time = 0 # average time a task has to wait for execution in s [Float] @workers = [] # thread pool @spawned = 0 @waiting = 0 @shutdown = false @callback_on_task_finished = nil @pipes = nil @sync_keys = Set.new @trim_requests = 0 @auto_trim = false @mutex.synchronize do min.times do spawn_thread end end end |
Instance Attribute Details
#auto_trim ⇒ Boolean
Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work.
287 288 289 |
# File 'lib/utilrb/thread_pool.rb', line 287 def auto_trim @auto_trim end |
#avg_run_time ⇒ Float (readonly)
The average execution time of a (running) task.
277 278 279 |
# File 'lib/utilrb/thread_pool.rb', line 277 def avg_run_time @avg_run_time end |
#avg_wait_time ⇒ Float (readonly)
The average waiting time of a task before being executed.
282 283 284 |
# File 'lib/utilrb/thread_pool.rb', line 282 def avg_wait_time @avg_wait_time end |
#max ⇒ Fixnum
The maximum number of worker threads.
262 263 264 |
# File 'lib/utilrb/thread_pool.rb', line 262 def max @max end |
#min ⇒ Fixnum
The minimum number of worker threads.
257 258 259 |
# File 'lib/utilrb/thread_pool.rb', line 257 def min @min end |
#spawned ⇒ Fixnum (readonly)
The real number of worker threads.
267 268 269 |
# File 'lib/utilrb/thread_pool.rb', line 267 def spawned @spawned end |
#waiting ⇒ Fixnum (readonly)
The number of worker threads waiting for work.
272 273 274 |
# File 'lib/utilrb/thread_pool.rb', line 272 def waiting @waiting end |
Instance Method Details
#<<(task) ⇒ Task
Processes the given Task as soon as the next thread is available
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 |
# File 'lib/utilrb/thread_pool.rb', line 486 def <<(task) raise "cannot add task #{task} it is still running" if task.thread task.reset if task.finished? @mutex.synchronize do if shutdown? raise "unable to add work while shutting down" end task.queued_at = Time.now @tasks_waiting << task if @waiting <= @tasks_waiting.size && @spawned < @max spawn_thread end @cond.signal end task end |
#backlog ⇒ Fixnum
Number of tasks waiting for execution
375 376 377 378 379 |
# File 'lib/utilrb/thread_pool.rb', line 375 def backlog @mutex.synchronize do @tasks_waiting.length end end |
#clear ⇒ Object
343 344 345 346 347 348 349 |
# File 'lib/utilrb/thread_pool.rb', line 343 def clear shutdown join rescue Exception ensure @shutdown = false end |
#join ⇒ Object
Blocks until all threads were terminated. This does not terminate any thread by itself and will block for ever if shutdown was not called.
529 530 531 532 533 534 535 536 537 538 |
# File 'lib/utilrb/thread_pool.rb', line 529 def join while true if w = @mutex.synchronize { @workers.first } w.join else break end end self end |
#on_task_finished {|Task| ... } ⇒ Object
Given code block is called for every task which was finished even it was terminated.
This can be used to store the result for an event loop
546 547 548 549 550 |
# File 'lib/utilrb/thread_pool.rb', line 546 def on_task_finished (&block) @mutex.synchronize do @callback_on_task_finished = block end end |
#process(*args) {|*args| ... } ⇒ Task
Processes the given block as soon as the next thread is available.
395 396 397 |
# File 'lib/utilrb/thread_pool.rb', line 395 def process (*args, &block) (nil,*args,&block) end |
#process? ⇒ Boolean
Returns true if a worker thread is currently processing a task and no work is queued
403 404 405 406 407 |
# File 'lib/utilrb/thread_pool.rb', line 403 def process? @mutex.synchronize do waiting != spawned || @tasks_waiting.length > 0 end end |
#process_with_options(options, *args, &block) ⇒ Task
Processes the given block as soon as the next thread is available with the given options.
415 416 417 418 419 |
# File 'lib/utilrb/thread_pool.rb', line 415 def (,*args, &block) task = Task.new(,*args, &block) self << task task end |
#resize(min, max = nil) ⇒ Object
Changes the minimum and maximum number of threads
360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/utilrb/thread_pool.rb', line 360 def resize (min, max = nil) @mutex.synchronize do @min = min @max = max || min count = [@tasks_waiting.size,@max - @spawned].min count.times do spawn_thread end end trim true end |
#shutdown ⇒ Object
Shuts down all threads.
517 518 519 520 521 522 523 |
# File 'lib/utilrb/thread_pool.rb', line 517 def shutdown tasks = nil @mutex.synchronize do @shutdown = true @cond.broadcast end end |
#shutdown? ⇒ boolean
Checks if the thread pool is shutting down all threads.
354 |
# File 'lib/utilrb/thread_pool.rb', line 354 def shutdown?; @shutdown; end |
#sync(sync_key, *args) {|*args| ... } ⇒ Object
Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.
This is useful for instance member calls which are not thread safe.
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 |
# File 'lib/utilrb/thread_pool.rb', line 431 def sync(sync_key,*args,&block) raise ArgumentError,"no sync key" unless sync_key @mutex.synchronize do while(!@sync_keys.add?(sync_key)) @cond_sync_key.wait @mutex #wait until someone has removed a key end end begin result = block.call(*args) ensure @mutex.synchronize do @sync_keys.delete sync_key end @cond_sync_key.signal @cond.signal # worker threads are just waiting for work no matter if it is # because of a deletion of a sync_key or a task was added end result end |
#sync_keys ⇒ Object
returns the current used sync_keys
337 338 339 340 341 |
# File 'lib/utilrb/thread_pool.rb', line 337 def sync_keys @mutex.synchronize do @sync_keys.clone end end |
#sync_timeout(sync_key, timeout, *args) {|*args| ... } ⇒ Object
Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 |
# File 'lib/utilrb/thread_pool.rb', line 459 def sync_timeout(sync_key,timeout,*args,&block) raise ArgumentError,"no sync key" unless sync_key Timeout::timeout(timeout) do @mutex.synchronize do while(!@sync_keys.add?(sync_key)) @cond_sync_key.wait @mutex #wait until someone has removed a key end end end begin result = block.call(*args) ensure @mutex.synchronize do @sync_keys.delete sync_key end @cond_sync_key.signal @cond.signal # worker threads are just waiting for work no matter if it is # because of a deletion of a sync_key or a task was added end result end |
#tasks ⇒ Array<Task>
Returns an array of the current waiting and running tasks
384 385 386 387 388 |
# File 'lib/utilrb/thread_pool.rb', line 384 def tasks @mutex.synchronize do @tasks_running.dup + @tasks_waiting.dup end end |
#trim(force = false) ⇒ Object
Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.
507 508 509 510 511 512 513 |
# File 'lib/utilrb/thread_pool.rb', line 507 def trim (force = false) @mutex.synchronize do @trim_requests += 1 @cond.signal end self end |