Class: ZK::Threadpool

Inherits:
Object
  • Object
show all
Includes:
Exceptions, Logger
Defined in:
lib/zk/threadpool.rb

Overview

a simple threadpool for running blocks of code off the main thread

Constant Summary collapse

DEFAULT_SIZE =
5

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#logger, wrapped_logger, wrapped_logger=

Constructor Details

#initialize(size = nil) ⇒ Threadpool

Returns a new instance of Threadpool.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/zk/threadpool.rb', line 18

def initialize(size=nil)
  @size = size || self.class.default_size

  @threadpool = []
  @state = :new
  @queue = []

  @mutex = Mutex.new
  @cond  = ConditionVariable.new

  @error_callbacks = []

  start!
end

Class Attribute Details

.default_sizeObject

size of the ZK.defer threadpool (defaults to 5)



11
12
13
# File 'lib/zk/threadpool.rb', line 11

def default_size
  @default_size
end

Instance Attribute Details

#sizeObject (readonly)

the size of this threadpool



16
17
18
# File 'lib/zk/threadpool.rb', line 16

def size
  @size
end

Instance Method Details

#alive?Boolean

are all of our threads alive? returns false if there are no running threads

Returns:

  • (Boolean)


35
36
37
38
39
40
41
42
# File 'lib/zk/threadpool.rb', line 35

def alive?
  @mutex.lock
  begin
    !@threadpool.empty? and @threadpool.all?(&:alive?)
  ensure
    @mutex.unlock rescue nil
  end
end

#defer(callable = nil, &blk) ⇒ Object

Queue an operation to be run on an internal threadpool. You may either provide an object that responds_to?(:call) or pass a block. There is no mechanism for retrieving the result of the operation, it is purely fire-and-forget, so the user is expected to make arrangements for this in their code.

Raises:

  • (ArgumentError)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/zk/threadpool.rb', line 50

def defer(callable=nil, &blk)
  callable ||= blk

  raise ArgumentError, "Argument to Threadpool#defer must respond_to?(:call)" unless callable.respond_to?(:call)

  @mutex.lock
  begin
    @queue << callable
    @cond.broadcast
  ensure
    @mutex.unlock rescue nil
  end

  nil
end

#on_exception(&blk) ⇒ Object

Note:

if your exception callback block itself raises an exception, I will make fun of you.

register a block to be called back with unhandled exceptions that occur in the threadpool.



156
157
158
159
160
# File 'lib/zk/threadpool.rb', line 156

def on_exception(&blk)
  @mutex.synchronize do
    @error_callbacks << blk
  end
end

#on_threadpool?Boolean

returns true if the current thread is one of the threadpool threads

Returns:

  • (Boolean)


76
77
78
79
80
81
82
83
84
85
# File 'lib/zk/threadpool.rb', line 76

def on_threadpool?
  tp = nil

  @mutex.synchronize do
    return false unless @threadpool # you can't dup nil
    tp = @threadpool.dup
  end

  tp.respond_to?(:include?) and tp.include?(Thread.current)
end

#running?Boolean

Returns:

  • (Boolean)


66
67
68
69
70
71
72
73
# File 'lib/zk/threadpool.rb', line 66

def running?
  @mutex.lock
  begin
    @state == :running
  ensure
    @mutex.unlock rescue nil
  end
end

#shutdown(timeout = 2) ⇒ Object

join all threads in this threadpool, they will be given a maximum of +timeout+ seconds to exit before they are considered hung and will be ignored (this is an issue with threads in general: see http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html for more info)

the default timeout is 2 seconds per thread



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/zk/threadpool.rb', line 169

def shutdown(timeout=2)
  threads = nil

  @mutex.lock
  begin
    return false if @state == :shutdown
    @state = :shutdown

    @queue.clear
    threads, @threadpool = @threadpool, []
    @cond.broadcast
  ensure
    @mutex.unlock rescue nil
  end

  join_all(threads)

  nil
end

#start!Object

starts the threadpool if not already running



88
89
90
91
92
93
94
95
96
# File 'lib/zk/threadpool.rb', line 88

def start!
  @mutex.synchronize do
    return false if @state == :running
    @state = :running
    spawn_threadpool
  end

  true
end