Class: ZeevexConcurrency::ThreadPool::FixedPool

Inherits:
Object
  • Object
show all
Includes:
Stubs
Defined in:
lib/zeevex_concurrency/thread_pool.rb

Overview

Use a fixed pool of N threads to process jobs

Defined Under Namespace

Classes: HaltObject

Instance Method Summary collapse

Constructor Details

#initialize(count = -1)) ⇒ FixedPool

Returns a new instance of FixedPool.



194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/zeevex_concurrency/thread_pool.rb', line 194

def initialize(count = -1)
  if count == -1
    count = ZeevexConcurrency::ThreadPool.cpu_count * 2
  end
  @count = count
  @queue = Queue.new
  @mutex = Mutex.new
  @group = ThreadGroup.new
  @busy_count = Atomic.new(0)

  start
end

Instance Method Details

#backlogObject

how many tasks are waiting



278
279
280
# File 'lib/zeevex_concurrency/thread_pool.rb', line 278

def backlog
  @queue.size
end

#busy?Boolean

Returns:

  • (Boolean)


259
260
261
# File 'lib/zeevex_concurrency/thread_pool.rb', line 259

def busy?
  free_count == 0
end

#busy_countObject



267
268
269
# File 'lib/zeevex_concurrency/thread_pool.rb', line 267

def busy_count
  @busy_count.value
end

#enqueue(runnable = nil, &block) ⇒ Object



207
208
209
# File 'lib/zeevex_concurrency/thread_pool.rb', line 207

def enqueue(runnable = nil, &block)
  @queue << _check_args(runnable, block)
end

#flushObject

flush queued jobs



283
284
285
# File 'lib/zeevex_concurrency/thread_pool.rb', line 283

def flush
  @queue.clear
end

#free_countObject



271
272
273
# File 'lib/zeevex_concurrency/thread_pool.rb', line 271

def free_count
  (worker_count - busy_count)
end

#joinObject

this is tricky as there may be one or more workers stuck in VERY long running jobs so what we do is:

Insert a job that stops processing When it runs, we can be sure that all previous jobs have popped off the queue However, previous jobs may still be running So we have to ask each thread to pause until they’ve all paused



296
297
298
299
300
301
302
303
304
# File 'lib/zeevex_concurrency/thread_pool.rb', line 296

def join
  halter = HaltObject.new(@count)

  # ensure each thread gets a copy
  @count.times { @queue << halter }

  # wait until every thread has entered
  halter.wait
end

#startObject



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/zeevex_concurrency/thread_pool.rb', line 211

def start
  @mutex.synchronize do
    return if @started

    @stop_requested = false

    @count.times do
      thr = Thread.new(@queue) do
        while !@stop_requested
          begin
            work = @queue.pop

            # notify that this thread is stopping and wait for the signal to continue
            if work.is_a?(HaltObject)
              work.halt!
              continue
            end

            _start_work
            work.call
            _end_work
          rescue Exception
            ZeevexConcurrency.logger.error %{Exception caught in thread pool: #{$!.inspect}: #{$!.backtrace.join("\n")}}
          end
        end
      end
      @group.add(thr)
    end

    @started = true
  end
end

#stopObject



244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/zeevex_concurrency/thread_pool.rb', line 244

def stop
  @mutex.synchronize do
    return unless @started

    @stop_requested = true

    @group.list.each do |thr|
      thr.kill
    end

    @busy_count.set 0
    @started = false
  end
end

#worker_countObject



263
264
265
# File 'lib/zeevex_concurrency/thread_pool.rb', line 263

def worker_count
  @count
end