Class: ZeevexConcurrency::ThreadPool::ThreadPerJobPool

Inherits:
Object
  • Object
show all
Includes:
Stubs
Defined in:
lib/zeevex_concurrency/thread_pool.rb

Overview

Launch a concurrent thread for every new task enqueued

Instance Method Summary collapse

Methods included from Stubs

#backlog, #busy?, #flush, #free_count

Constructor Details

#initializeThreadPerJobPool

Returns a new instance of ThreadPerJobPool.



132
133
134
135
136
137
138
# File 'lib/zeevex_concurrency/thread_pool.rb', line 132

def initialize
  @mutex = Mutex.new
  @group = ThreadGroup.new
  @busy_count = Atomic.new(0)

  start
end

Instance Method Details

#busyObject



179
180
181
# File 'lib/zeevex_concurrency/thread_pool.rb', line 179

def busy
  false
end

#busy_countObject



175
176
177
# File 'lib/zeevex_concurrency/thread_pool.rb', line 175

def busy_count
  @busy_count.value
end

#enqueue(runnable = nil, &block) ⇒ Object



140
141
142
143
144
145
146
147
148
149
# File 'lib/zeevex_concurrency/thread_pool.rb', line 140

def enqueue(runnable = nil, &block)
  raise "Must be started" unless @started
  callable = _check_args(runnable, block)
  thr = Thread.new do
    @busy_count.update {|x| x + 1}
    callable.call
    @busy_count.update {|x| x - 1}
  end
  @group.add(thr)
end

#joinObject



155
156
157
158
159
160
# File 'lib/zeevex_concurrency/thread_pool.rb', line 155

def join
  @group.list.dup.each do |thr|
    thr.join
  end
  true
end

#startObject



151
152
153
# File 'lib/zeevex_concurrency/thread_pool.rb', line 151

def start
  @started = true
end

#stopObject



162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/zeevex_concurrency/thread_pool.rb', line 162

def stop
  @mutex.synchronize do
    return unless @started

    @group.list.dup.each do |thr|
      thr.kill
    end

    @started = false
    @busy_count.set 0
  end
end

#worker_countObject



183
184
185
# File 'lib/zeevex_concurrency/thread_pool.rb', line 183

def worker_count
  @busy_count.value
end