Class: Utilrb::ThreadPool

Inherits:
Object show all
Defined in:
lib/utilrb/thread_pool.rb

Overview

ThreadPool implementation inspired by github.com/meh/ruby-threadpool

Examples:

Using a thread pool of 10 threads

pool = ThreadPool.new(10)
0.upto(9) do 
   pool.process do 
     sleep 1
     puts "done"
   end
end
pool.shutdown
pool.join

Author:

Defined Under Namespace

Classes: Task

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min = 5, max = min) ⇒ ThreadPool

A ThreadPool

Parameters:

  • min (Fixnum) (defaults to: 5)

    the minimum number of threads

  • max (Fixnum) (defaults to: min)

    the maximum number of threads



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/utilrb/thread_pool.rb', line 292

def initialize (min = 5, max = min)
    @min = min
    @max = max

    @cond = ConditionVariable.new
    @cond_sync_key = ConditionVariable.new
    @mutex = Mutex.new

    @tasks_waiting = []         # tasks waiting for execution
    @tasks_running = []         # tasks which are currently running
    
    # Statistics
    @avg_run_time = 0           # average run time of a task in s [Float]
    @avg_wait_time = 0          # average time a task has to wait for execution in s [Float]

    @workers = []               # thread pool
    @spawned = 0
    @waiting = 0
    @shutdown = false
    @callback_on_task_finished = nil
    @pipes = nil
    @sync_keys = Set.new

    @trim_requests = 0
    @auto_trim = false

    @mutex.synchronize do
        min.times do
            spawn_thread
        end
    end
end

Instance Attribute Details

#auto_trimBoolean

Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work.

Returns:

  • (Boolean)


286
287
288
# File 'lib/utilrb/thread_pool.rb', line 286

def auto_trim
  @auto_trim
end

#avg_run_timeFloat (readonly)

The average execution time of a (running) task.

Returns:

  • (Float)


276
277
278
# File 'lib/utilrb/thread_pool.rb', line 276

def avg_run_time
  @avg_run_time
end

#avg_wait_timeFloat (readonly)

The average waiting time of a task before being executed.

Returns:

  • (Float)


281
282
283
# File 'lib/utilrb/thread_pool.rb', line 281

def avg_wait_time
  @avg_wait_time
end

#maxFixnum

The maximum number of worker threads.

Returns:

  • (Fixnum)


261
262
263
# File 'lib/utilrb/thread_pool.rb', line 261

def max
  @max
end

#minFixnum

The minimum number of worker threads.

Returns:

  • (Fixnum)


256
257
258
# File 'lib/utilrb/thread_pool.rb', line 256

def min
  @min
end

#spawnedFixnum (readonly)

The real number of worker threads.

Returns:

  • (Fixnum)


266
267
268
# File 'lib/utilrb/thread_pool.rb', line 266

def spawned
  @spawned
end

#waitingFixnum (readonly)

The number of worker threads waiting for work.

Returns:

  • (Fixnum)


271
272
273
# File 'lib/utilrb/thread_pool.rb', line 271

def waiting
  @waiting
end

Instance Method Details

#<<(task) ⇒ Task

Processes the given Task as soon as the next thread is available

Parameters:

  • task (Task)

    The task.

Returns:



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/utilrb/thread_pool.rb', line 455

def <<(task)
    raise "cannot add task #{task} it is still running" if task.thread
    task.reset if task.finished?
    @mutex.synchronize do
        if shutdown? 
            raise "unable to add work while shutting down"
        end
        task.queued_at = Time.now
        @tasks_waiting << task
        if @waiting == 0 && @spawned < @max
            spawn_thread
        end
        @cond.signal
    end
    task
end

#backlogFixnum

Number of tasks waiting for execution

Returns:

  • (Fixnum)

    the number of tasks



374
375
376
377
378
# File 'lib/utilrb/thread_pool.rb', line 374

def backlog
   @mutex.synchronize do 
        @tasks_waiting.length
    end
end

#clearObject



342
343
344
345
346
347
348
# File 'lib/utilrb/thread_pool.rb', line 342

def clear
    shutdown
    join
rescue Exception
ensure
    @shutdown = false
end

#joinObject

Blocks until all threads were terminated. This does not terminate any thread by itself and will block for ever if shutdown was not called.



500
501
502
503
# File 'lib/utilrb/thread_pool.rb', line 500

def join
    @workers.first.join until @workers.empty?
    self
end

#on_task_finished {|Task| ... } ⇒ Object

Given code block is called for every task which was finished even it was terminated.

This can be used to store the result for an event loop

Yields:

  • (Task)

    the code block



511
512
513
514
515
# File 'lib/utilrb/thread_pool.rb', line 511

def on_task_finished (&block)
    @mutex.synchronize do
        @callback_on_task_finished = block
    end
end

#process(*args) {|*args| ... } ⇒ Task

Processes the given block as soon as the next thread is available.

Parameters:

  • args (Array)

    the block arguments

Yields:

  • (*args)

    the block

Returns:



394
395
396
# File 'lib/utilrb/thread_pool.rb', line 394

def process (*args, &block)
    process_with_options(nil,*args,&block)
end

#process?Boolean

Returns true if a worker thread is currently processing a task and no work is queued

Returns:

  • (Boolean)


402
403
404
405
406
# File 'lib/utilrb/thread_pool.rb', line 402

def process?
    @mutex.synchronize do
         waiting != spawned || @tasks_waiting.length > 0
    end
end

#process_with_options(options, *args, &block) ⇒ Task

Processes the given block as soon as the next thread is available with the given options.

Parameters:

  • options (Hash)

    The options of the task.

  • args (Array)

    The arguments for the code block

  • block (#call)

    The code block

Options Hash (options):

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.

Returns:



414
415
416
417
418
# File 'lib/utilrb/thread_pool.rb', line 414

def process_with_options(options,*args, &block)
    task = Task.new(options,*args, &block)
    self << task
    task
end

#resize(min, max = nil) ⇒ Object

Changes the minimum and maximum number of threads

Parameters:

  • min (Fixnum)

    the minimum number of threads

  • max (Fixnum) (defaults to: nil)

    the maximum number of threads



359
360
361
362
363
364
365
366
367
368
369
# File 'lib/utilrb/thread_pool.rb', line 359

def resize (min, max = nil)
    @mutex.synchronize do
        @min = min
        @max = max || min
        count = [@tasks_waiting.size,@max].min
        0.upto(count) do 
            spawn_thread
        end
    end
    trim true
end

#shutdownObject

Shuts down all threads.



488
489
490
491
492
493
494
# File 'lib/utilrb/thread_pool.rb', line 488

def shutdown()
    tasks = nil
    @mutex.synchronize do
        @shutdown = true
    end
    @cond.broadcast
end

#shutdown?boolean

Checks if the thread pool is shutting down all threads.

Returns:

  • (boolean)


353
# File 'lib/utilrb/thread_pool.rb', line 353

def shutdown?; @shutdown; end

#sync(sync_key, *args) {|*args| ... } ⇒ Object

Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.

This is useful for instance member calls which are not thread safe.

Parameters:

  • sync_key (Object)

    The sync key

Yields:

  • (*args)

    the code block block

Returns:

  • (Object)

    The result of the code block

Raises:

  • (ArgumentError)


430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/utilrb/thread_pool.rb', line 430

def sync(sync_key,*args,&block)
    raise ArgumentError,"no sync key" unless sync_key

    @mutex.synchronize do
        while(!@sync_keys.add?(sync_key))
            @cond_sync_key.wait @mutex #wait until someone has removed a key
        end
    end
    begin
        result = block.call(*args)
    ensure
        @mutex.synchronize do
            @sync_keys.delete sync_key
        end
        @cond_sync_key.signal
        @cond.signal # worker threads are just waiting for work no matter if it is
        # because of a deletion of a sync_key or a task was added
    end
    result
end

#sync_keysObject

returns the current used sync_keys



336
337
338
339
340
# File 'lib/utilrb/thread_pool.rb', line 336

def sync_keys
    @mutex.synchronize do
        @sync_keys.clone
    end
end

#tasksArray<Task>

Returns an array of the current waiting and running tasks

Returns:



383
384
385
386
387
# File 'lib/utilrb/thread_pool.rb', line 383

def tasks
    @mutex.synchronize do
         @tasks_running.dup + @tasks_waiting.dup
    end
end

#trim(force = false) ⇒ Object

Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.

Parameters:

  • force (boolean) (defaults to: false)

    Trim even if no thread is waiting.



476
477
478
479
480
481
482
483
484
# File 'lib/utilrb/thread_pool.rb', line 476

def trim (force = false)
    @mutex.synchronize do
        if (force || @waiting > 0) && @spawned - @trim_requests > @min
            @trim_requests += 1
            @cond.signal
        end
    end
    self
end