Class: ZK::Threadpool

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/zk/threadpool.rb

Overview

a simple threadpool for running blocks of code off the main thread

Constant Summary collapse

DEFAULT_SIZE =
5

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size = nil) ⇒ Threadpool

Returns a new instance of Threadpool.



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/zk/threadpool.rb', line 17

def initialize(size=nil)
  @size = size || self.class.default_size

  @threadpool = []
  @threadqueue = ::Queue.new

  @mutex = Monitor.new

  @error_callbacks = []

  start!
end

Class Attribute Details

.default_sizeObject

size of the ZK.defer threadpool (defaults to 5)



10
11
12
# File 'lib/zk/threadpool.rb', line 10

def default_size
  @default_size
end

Instance Attribute Details

#sizeObject (readonly)

the size of this threadpool



15
16
17
# File 'lib/zk/threadpool.rb', line 15

def size
  @size
end

Instance Method Details

#defer(callable = nil, &blk) ⇒ Object

Queue an operation to be run on an internal threadpool. You may either provide an object that responds_to?(:call) or pass a block. There is no mechanism for retrieving the result of the operation, it is purely fire-and-forget, so the user is expected to make arrangements for this in their code.



36
37
38
39
40
41
42
43
44
45
# File 'lib/zk/threadpool.rb', line 36

def defer(callable=nil, &blk)
  callable ||= blk

  # XXX(slyphon): do we care if the threadpool is not running?
#       raise Exceptions::ThreadpoolIsNotRunningException unless running?
  raise ArgumentError, "Argument to Threadpool#defer must respond_to?(:call)" unless callable.respond_to?(:call)

  @threadqueue << callable
  nil
end

#on_exception(&blk) ⇒ Object

Note:

if your exception callback block itself raises an exception, I will make fun of you.

register a block to be called back with unhandled exceptions that occur in the threadpool.



73
74
75
76
77
# File 'lib/zk/threadpool.rb', line 73

def on_exception(&blk)
  @mutex.synchronize do
    @error_callbacks << blk
  end
end

#on_threadpool?Boolean

returns true if the current thread is one of the threadpool threads

Returns:

  • (Boolean)


52
53
54
55
# File 'lib/zk/threadpool.rb', line 52

def on_threadpool?
  tp = @mutex.synchronize { @threadpool.dup }
  tp and tp.respond_to?(:include?) and tp.include?(Thread.current)
end

#running?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/zk/threadpool.rb', line 47

def running?
  @mutex.synchronize { @running }
end

#shutdown(timeout = 2) ⇒ Object

join all threads in this threadpool, they will be given a maximum of +timeout+ seconds to exit before they are considered hung and will be ignored (this is an issue with threads in general: see http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html for more info)

the default timeout is 2 seconds per thread



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/zk/threadpool.rb', line 86

def shutdown(timeout=2)
  @mutex.synchronize do
    return unless @running
    @running = false
    @threadqueue.clear
    @size.times { @threadqueue << KILL_TOKEN }

    threads, @threadpool = @threadpool, []

    while th = threads.shift
      begin
        th.join(timeout)
      rescue Exception => e
        logger.error { "Caught exception shutting down threadpool" }
        logger.error { e.to_std_format }
      end
    end

    @threadqueue = ::Queue.new
  end

  nil
end

#start!Object

starts the threadpool if not already running



58
59
60
61
62
63
64
65
# File 'lib/zk/threadpool.rb', line 58

def start!
  @mutex.synchronize do
    return false if @running
    @running = true
    spawn_threadpool
  end
  true
end