Class: Utilrb::ThreadPool

Inherits:
Object show all
Defined in:
lib/utilrb/thread_pool.rb

Overview

ThreadPool implementation inspired by github.com/meh/ruby-threadpool

Examples:

Using a thread pool of 10 threads

pool = ThreadPool.new(10)
0.upto(9) do 
   pool.process do 
     sleep 1
     puts "done"
   end
end
pool.shutdown
pool.join

Author:

Defined Under Namespace

Classes: Task

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min = 5, max = min) ⇒ ThreadPool

A ThreadPool

Parameters:

  • min (Fixnum) (defaults to: 5)

    the minimum number of threads

  • max (Fixnum) (defaults to: min)

    the maximum number of threads



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/utilrb/thread_pool.rb', line 293

def initialize (min = 5, max = min)
    @min = min
    @max = max

    @cond = ConditionVariable.new
    @cond_sync_key = ConditionVariable.new
    @mutex = Mutex.new

    @tasks_waiting = []         # tasks waiting for execution
    @tasks_running = []         # tasks which are currently running
    
    # Statistics
    @avg_run_time = 0           # average run time of a task in s [Float]
    @avg_wait_time = 0          # average time a task has to wait for execution in s [Float]

    @workers = []               # thread pool
    @spawned = 0
    @waiting = 0
    @shutdown = false
    @callback_on_task_finished = nil
    @pipes = nil
    @sync_keys = Set.new

    @trim_requests = 0
    @auto_trim = false

    @mutex.synchronize do
        min.times do
            spawn_thread
        end
    end
end

Instance Attribute Details

#auto_trimBoolean

Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work.

Returns:

  • (Boolean)


287
288
289
# File 'lib/utilrb/thread_pool.rb', line 287

def auto_trim
  @auto_trim
end

#avg_run_timeFloat (readonly)

The average execution time of a (running) task.

Returns:

  • (Float)


277
278
279
# File 'lib/utilrb/thread_pool.rb', line 277

def avg_run_time
  @avg_run_time
end

#avg_wait_timeFloat (readonly)

The average waiting time of a task before being executed.

Returns:

  • (Float)


282
283
284
# File 'lib/utilrb/thread_pool.rb', line 282

def avg_wait_time
  @avg_wait_time
end

#maxFixnum

The maximum number of worker threads.

Returns:

  • (Fixnum)


262
263
264
# File 'lib/utilrb/thread_pool.rb', line 262

def max
  @max
end

#minFixnum

The minimum number of worker threads.

Returns:

  • (Fixnum)


257
258
259
# File 'lib/utilrb/thread_pool.rb', line 257

def min
  @min
end

#spawnedFixnum (readonly)

The real number of worker threads.

Returns:

  • (Fixnum)


267
268
269
# File 'lib/utilrb/thread_pool.rb', line 267

def spawned
  @spawned
end

#waitingFixnum (readonly)

The number of worker threads waiting for work.

Returns:

  • (Fixnum)


272
273
274
# File 'lib/utilrb/thread_pool.rb', line 272

def waiting
  @waiting
end

Instance Method Details

#<<(task) ⇒ Task

Processes the given Task as soon as the next thread is available

Parameters:

  • task (Task)

    The task.

Returns:



486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
# File 'lib/utilrb/thread_pool.rb', line 486

def <<(task)
    raise "cannot add task #{task} it is still running" if task.thread
    task.reset if task.finished?
    @mutex.synchronize do
        if shutdown? 
            raise "unable to add work while shutting down"
        end
        task.queued_at = Time.now
        @tasks_waiting << task
        if @waiting <= @tasks_waiting.size && @spawned < @max
            spawn_thread
        end
        @cond.signal
    end
    task
end

#backlogFixnum

Number of tasks waiting for execution

Returns:

  • (Fixnum)

    the number of tasks



375
376
377
378
379
# File 'lib/utilrb/thread_pool.rb', line 375

def backlog
    @mutex.synchronize do 
        @tasks_waiting.length
    end
end

#clearObject



343
344
345
346
347
348
349
# File 'lib/utilrb/thread_pool.rb', line 343

def clear
    shutdown
    join
rescue Exception
ensure
    @shutdown = false
end

#joinObject

Blocks until all threads were terminated. This does not terminate any thread by itself and will block for ever if shutdown was not called.



529
530
531
532
533
534
535
536
537
538
# File 'lib/utilrb/thread_pool.rb', line 529

def join
    while true
        if w = @mutex.synchronize { @workers.first }
            w.join
        else
            break
        end
    end
    self
end

#on_task_finished {|Task| ... } ⇒ Object

Given code block is called for every task which was finished even it was terminated.

This can be used to store the result for an event loop

Yields:

  • (Task)

    the code block



546
547
548
549
550
# File 'lib/utilrb/thread_pool.rb', line 546

def on_task_finished (&block)
    @mutex.synchronize do
        @callback_on_task_finished = block
    end
end

#process(*args) {|*args| ... } ⇒ Task

Processes the given block as soon as the next thread is available.

Parameters:

  • args (Array)

    the block arguments

Yields:

  • (*args)

    the block

Returns:



395
396
397
# File 'lib/utilrb/thread_pool.rb', line 395

def process (*args, &block)
    process_with_options(nil,*args,&block)
end

#process?Boolean

Returns true if a worker thread is currently processing a task and no work is queued

Returns:

  • (Boolean)


403
404
405
406
407
# File 'lib/utilrb/thread_pool.rb', line 403

def process?
    @mutex.synchronize do
         waiting != spawned || @tasks_waiting.length > 0
    end
end

#process_with_options(options, *args, &block) ⇒ Task

Processes the given block as soon as the next thread is available with the given options.

Parameters:

  • options (Hash)

    The options of the task.

  • args (Array)

    The arguments for the code block

  • block (#call)

    The code block

Options Hash (options):

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.

Returns:



415
416
417
418
419
# File 'lib/utilrb/thread_pool.rb', line 415

def process_with_options(options,*args, &block)
    task = Task.new(options,*args, &block)
    self << task
    task
end

#resize(min, max = nil) ⇒ Object

Changes the minimum and maximum number of threads

Parameters:

  • min (Fixnum)

    the minimum number of threads

  • max (Fixnum) (defaults to: nil)

    the maximum number of threads



360
361
362
363
364
365
366
367
368
369
370
# File 'lib/utilrb/thread_pool.rb', line 360

def resize (min, max = nil)
    @mutex.synchronize do
        @min = min
        @max = max || min
        count = [@tasks_waiting.size,@max - @spawned].min
        count.times do
            spawn_thread
        end
    end
    trim true
end

#shutdownObject

Shuts down all threads.



517
518
519
520
521
522
523
# File 'lib/utilrb/thread_pool.rb', line 517

def shutdown
    tasks = nil
    @mutex.synchronize do
        @shutdown = true
        @cond.broadcast
    end
end

#shutdown?boolean

Checks if the thread pool is shutting down all threads.

Returns:

  • (boolean)


354
# File 'lib/utilrb/thread_pool.rb', line 354

def shutdown?; @shutdown; end

#sync(sync_key, *args) {|*args| ... } ⇒ Object

Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.

This is useful for instance member calls which are not thread safe.

Parameters:

  • sync_key (Object)

    The sync key

Yields:

  • (*args)

    the code block block

Returns:

  • (Object)

    The result of the code block

Raises:

  • (ArgumentError)


431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# File 'lib/utilrb/thread_pool.rb', line 431

def sync(sync_key,*args,&block)
    raise ArgumentError,"no sync key" unless sync_key

    @mutex.synchronize do
        while(!@sync_keys.add?(sync_key))
            @cond_sync_key.wait @mutex #wait until someone has removed a key
        end
    end
    begin
        result = block.call(*args)
    ensure
        @mutex.synchronize do
            @sync_keys.delete sync_key
        end
        @cond_sync_key.signal
        @cond.signal # worker threads are just waiting for work no matter if it is
        # because of a deletion of a sync_key or a task was added
    end
    result
end

#sync_keysObject

returns the current used sync_keys



337
338
339
340
341
# File 'lib/utilrb/thread_pool.rb', line 337

def sync_keys
    @mutex.synchronize do
        @sync_keys.clone
    end
end

#sync_timeout(sync_key, timeout, *args) {|*args| ... } ⇒ Object

Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.

Parameters:

  • sync_key (Object)

    The sync key

  • timeout (Float)

    The timeout

Yields:

  • (*args)

    the code block block

Returns:

  • (Object)

    The result of the code block

Raises:

  • (ArgumentError)


459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# File 'lib/utilrb/thread_pool.rb', line 459

def sync_timeout(sync_key,timeout,*args,&block)
    raise ArgumentError,"no sync key" unless sync_key

    Timeout::timeout(timeout) do
        @mutex.synchronize do
            while(!@sync_keys.add?(sync_key))
                @cond_sync_key.wait @mutex #wait until someone has removed a key
            end
        end
    end
    begin
        result = block.call(*args)
    ensure
        @mutex.synchronize do
            @sync_keys.delete sync_key
        end
        @cond_sync_key.signal
        @cond.signal # worker threads are just waiting for work no matter if it is
        # because of a deletion of a sync_key or a task was added
    end
    result
end

#tasksArray<Task>

Returns an array of the current waiting and running tasks

Returns:



384
385
386
387
388
# File 'lib/utilrb/thread_pool.rb', line 384

def tasks
    @mutex.synchronize do
         @tasks_running.dup + @tasks_waiting.dup
    end
end

#trim(force = false) ⇒ Object

Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.

Parameters:

  • force (boolean) (defaults to: false)

    Trim even if no thread is waiting.



507
508
509
510
511
512
513
# File 'lib/utilrb/thread_pool.rb', line 507

def trim (force = false)
    @mutex.synchronize do
        @trim_requests += 1
        @cond.signal
    end
    self
end