Class: Concurrent::CachedThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent/cached_thread_pool.rb,
lib/concurrent/cached_thread_pool/worker.rb

Defined Under Namespace

Classes: Worker

Constant Summary collapse

MIN_POOL_SIZE =
1
MAX_POOL_SIZE =
256
DEFAULT_THREAD_IDLETIME =
60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ CachedThreadPool

Returns a new instance of CachedThreadPool.

Raises:



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/concurrent/cached_thread_pool.rb', line 17

def initialize(opts = {})
  @idletime = (opts[:idletime] || DEFAULT_THREAD_IDLETIME).to_i
  raise ArgumentError.new('idletime must be greater than zero') if @idletime <= 0

  @max_threads = opts[:max_threads] || opts[:max] || MAX_POOL_SIZE
  if @max_threads < MIN_POOL_SIZE || @max_threads > MAX_POOL_SIZE
    raise ArgumentError.new("size must be from #{MIN_POOL_SIZE} to #{MAX_POOL_SIZE}")
  end

  @state = :running
  @pool = []
  @terminator = Event.new
  @mutex = Mutex.new

  @busy = []
  @idle = []
end

Instance Attribute Details

#max_threadsObject

Returns the value of attribute max_threads.



15
16
17
# File 'lib/concurrent/cached_thread_pool.rb', line 15

def max_threads
  @max_threads
end

Instance Method Details

#<<(block) ⇒ Object



35
36
37
38
# File 'lib/concurrent/cached_thread_pool.rb', line 35

def <<(block)
  self.post(&block)
  return self
end

#killObject



85
86
87
88
89
90
91
92
93
# File 'lib/concurrent/cached_thread_pool.rb', line 85

def kill
  @mutex.synchronize do
    break if @state == :shutdown
    @state = :shutdown
      @idle.each{|worker| worker.kill }
      @busy.each{|worker| worker.kill }
    @terminator.set
  end
end

#lengthObject



95
96
97
98
99
# File 'lib/concurrent/cached_thread_pool.rb', line 95

def length
  @mutex.synchronize do
    @state == :running ? @busy.length + @idle.length : 0
  end
end

#on_end_task(worker) ⇒ Object



112
113
114
115
116
117
118
# File 'lib/concurrent/cached_thread_pool.rb', line 112

def on_end_task(worker)
  @mutex.synchronize do
    break unless @state == :running
    @busy.delete(worker)
    @idle.push(worker)
  end
end

#on_worker_exit(worker) ⇒ Object



101
102
103
104
105
106
107
108
109
110
# File 'lib/concurrent/cached_thread_pool.rb', line 101

def on_worker_exit(worker)
  @mutex.synchronize do
    @idle.delete(worker)
    @busy.delete(worker)
    if @idle.empty? && @busy.empty? && @state != :running
      @state = :shutdown
      @terminator.set
    end
  end
end

#post(*args, &block) ⇒ Object

Raises:



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/concurrent/cached_thread_pool.rb', line 40

def post(*args, &block)
  raise ArgumentError.new('no block given') if block.nil?
  @mutex.synchronize do
    break false unless @state == :running

    if @idle.empty?
      if @idle.length + @busy.length < @max_threads
        worker = create_worker_thread
      else
        worker = @busy.shift
      end
    else
      worker = @idle.pop
    end

    @busy.push(worker)
    worker.signal(*args, &block)

    prune_stale_workers
    true
  end
end

#running?Boolean

Returns:



63
64
65
# File 'lib/concurrent/cached_thread_pool.rb', line 63

def running?
  return @state == :running
end

#shutdownObject



71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/concurrent/cached_thread_pool.rb', line 71

def shutdown
  @mutex.synchronize do
    break unless @state == :running
    if @idle.empty? && @busy.empty?
      @state = :shutdown
      @terminator.set
    else
      @state = :shuttingdown
      @idle.each{|worker| worker.stop }
      @busy.each{|worker| worker.stop }
    end
  end
end

#wait_for_termination(timeout = nil) ⇒ Object



67
68
69
# File 'lib/concurrent/cached_thread_pool.rb', line 67

def wait_for_termination(timeout = nil)
  return @terminator.wait(timeout)
end