Class: ZK::Threadpool
Overview
a simple threadpool for running blocks of code off the main thread
Constant Summary collapse
- DEFAULT_SIZE =
5
Class Attribute Summary collapse
-
.default_size ⇒ Object
size of the ZK.defer threadpool (defaults to 5).
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
the size of this threadpool.
Instance Method Summary collapse
-
#defer(callable = nil, &blk) ⇒ Object
Queue an operation to be run on an internal threadpool.
-
#initialize(size = nil) ⇒ Threadpool
constructor
A new instance of Threadpool.
-
#on_exception(&blk) ⇒ Object
register a block to be called back with unhandled exceptions that occur in the threadpool.
-
#on_threadpool? ⇒ Boolean
returns true if the current thread is one of the threadpool threads.
- #running? ⇒ Boolean
-
#shutdown(timeout = 2) ⇒ Object
join all threads in this threadpool, they will be given a maximum of +timeout+ seconds to exit before they are considered hung and will be ignored (this is an issue with threads in general: see http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html for more info).
-
#start! ⇒ Object
starts the threadpool if not already running.
Constructor Details
#initialize(size = nil) ⇒ Threadpool
Returns a new instance of Threadpool.
17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/zk/threadpool.rb', line 17 def initialize(size=nil) @size = size || self.class.default_size @threadpool = [] @threadqueue = ::Queue.new @mutex = Monitor.new @error_callbacks = [] start! end |
Class Attribute Details
.default_size ⇒ Object
size of the ZK.defer threadpool (defaults to 5)
10 11 12 |
# File 'lib/zk/threadpool.rb', line 10 def default_size @default_size end |
Instance Attribute Details
#size ⇒ Object (readonly)
the size of this threadpool
15 16 17 |
# File 'lib/zk/threadpool.rb', line 15 def size @size end |
Instance Method Details
#defer(callable = nil, &blk) ⇒ Object
Queue an operation to be run on an internal threadpool. You may either provide an object that responds_to?(:call) or pass a block. There is no mechanism for retrieving the result of the operation, it is purely fire-and-forget, so the user is expected to make arrangements for this in their code.
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/zk/threadpool.rb', line 36 def defer(callable=nil, &blk) callable ||= blk # XXX(slyphon): do we care if the threadpool is not running? # raise Exceptions::ThreadpoolIsNotRunningException unless running? raise ArgumentError, "Argument to Threadpool#defer must respond_to?(:call)" unless callable.respond_to?(:call) @threadqueue << callable nil end |
#on_exception(&blk) ⇒ Object
if your exception callback block itself raises an exception, I will make fun of you.
register a block to be called back with unhandled exceptions that occur in the threadpool.
73 74 75 76 77 |
# File 'lib/zk/threadpool.rb', line 73 def on_exception(&blk) @mutex.synchronize do @error_callbacks << blk end end |
#on_threadpool? ⇒ Boolean
returns true if the current thread is one of the threadpool threads
52 53 54 55 |
# File 'lib/zk/threadpool.rb', line 52 def on_threadpool? tp = @mutex.synchronize { @threadpool.dup } tp and tp.respond_to?(:include?) and tp.include?(Thread.current) end |
#running? ⇒ Boolean
47 48 49 |
# File 'lib/zk/threadpool.rb', line 47 def running? @mutex.synchronize { @running } end |
#shutdown(timeout = 2) ⇒ Object
join all threads in this threadpool, they will be given a maximum of +timeout+ seconds to exit before they are considered hung and will be ignored (this is an issue with threads in general: see http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html for more info)
the default timeout is 2 seconds per thread
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/zk/threadpool.rb', line 86 def shutdown(timeout=2) @mutex.synchronize do return unless @running @running = false @threadqueue.clear @size.times { @threadqueue << KILL_TOKEN } threads, @threadpool = @threadpool, [] while th = threads.shift begin th.join(timeout) rescue Exception => e logger.error { "Caught exception shutting down threadpool" } logger.error { e.to_std_format } end end @threadqueue = ::Queue.new end nil end |
#start! ⇒ Object
starts the threadpool if not already running
58 59 60 61 62 63 64 65 |
# File 'lib/zk/threadpool.rb', line 58 def start! @mutex.synchronize do return false if @running @running = true spawn_threadpool end true end |