Class: ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/threadpool.rb

Defined Under Namespace

Classes: Task

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min, max = nil, &block) ⇒ ThreadPool

Returns a new instance of ThreadPool.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/threadpool.rb', line 82

def initialize (min, max = nil, &block)
  @min   = min
  @max   = max || min
  @block = block

  @cond  = ConditionVariable.new
  @mutex = Mutex.new

  @todo     = []
  @workers  = []
  @timeouts = {}

  @spawned       = 0
  @waiting       = 0
  @shutdown      = false
  @trim_requests = 0
  @auto_trim     = false

  @mutex.synchronize {
    min.times {
      spawn_thread
    }
  }
end

Instance Attribute Details

#maxObject (readonly)

Returns the value of attribute max.



80
81
82
# File 'lib/threadpool.rb', line 80

def max
  @max
end

#minObject (readonly)

Returns the value of attribute min.



80
81
82
# File 'lib/threadpool.rb', line 80

def min
  @min
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



80
81
82
# File 'lib/threadpool.rb', line 80

def spawned
  @spawned
end

Instance Method Details

#auto_trim!Object



110
# File 'lib/threadpool.rb', line 110

def auto_trim!;    @auto_trim = true;  end

#auto_trim?Boolean

Returns:

  • (Boolean)


109
# File 'lib/threadpool.rb', line 109

def auto_trim?;    @auto_trim;         end

#backlogObject



120
121
122
123
124
# File 'lib/threadpool.rb', line 120

def backlog
  @mutex.synchronize {
    @todo.length
  }
end

#joinObject



180
181
182
183
184
185
186
# File 'lib/threadpool.rb', line 180

def join
  @timeout.join if @timeout

  @workers.first.join until @workers.empty?

  self
end

#no_auto_trim!Object



111
# File 'lib/threadpool.rb', line 111

def no_auto_trim!; @auto_trim = false; end

#process(*args, &block) ⇒ Object Also known as: <<



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/threadpool.rb', line 126

def process (*args, &block)
  unless block || @block
    raise ArgumentError, 'you must pass a block'
  end

  task = Task.new(self, *args, &(block || @block))

  @mutex.synchronize {
    raise 'unable to add work while shutting down' if shutdown?

    @todo << task

    if @waiting == 0 && @spawned < @max
      spawn_thread
    end

    @cond.signal
  }

  task
end

#resize(min, max = nil) ⇒ Object



113
114
115
116
117
118
# File 'lib/threadpool.rb', line 113

def resize (min, max = nil)
  @min = min
  @max = max || min

  trim!
end

#shutdownObject



174
175
176
177
178
# File 'lib/threadpool.rb', line 174

def shutdown
  shutdown!

  join
end

#shutdown!Object



165
166
167
168
169
170
171
172
# File 'lib/threadpool.rb', line 165

def shutdown!
  @mutex.synchronize {
    @shutdown = true
    @cond.broadcast
  }

  wake_up_timeout
end

#shutdown?Boolean

Returns:

  • (Boolean)


107
# File 'lib/threadpool.rb', line 107

def shutdown?; @shutdown; end

#shutdown_after(timeout) ⇒ Object



200
201
202
203
204
205
206
207
208
# File 'lib/threadpool.rb', line 200

def shutdown_after (timeout)
  Thread.new {
    sleep timeout

    shutdown
  }

  self
end

#timeout_for(task, timeout) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
# File 'lib/threadpool.rb', line 188

def timeout_for (task, timeout)
  unless @timeout
    spawn_timeout_thread
  end

  @mutex.synchronize {
    @timeouts[task] = timeout

    wake_up_timeout
  }
end

#trim(force = false) ⇒ Object



150
151
152
153
154
155
156
157
158
159
# File 'lib/threadpool.rb', line 150

def trim (force = false)
  @mutex.synchronize {
    if (force || @waiting > 0) && @spawned - @trim_requests > @min
      @trim_requests -= 1
      @cond.signal
    end
  }

  self
end

#trim!Object



161
162
163
# File 'lib/threadpool.rb', line 161

def trim!
  trim true
end

#wake_up_timeoutObject



210
211
212
213
214
# File 'lib/threadpool.rb', line 210

def wake_up_timeout
  if @pipes
    @pipes.last.write_nonblock 'x' rescue nil
  end
end