Class: Threasy::Work
- Inherits:
-
Object
show all
- Defined in:
- lib/threasy/work.rb
Defined Under Namespace
Classes: TimeoutQueue, Worker
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize ⇒ Work
Returns a new instance of Work.
5
6
7
8
9
|
# File 'lib/threasy/work.rb', line 5
def initialize
@queue = TimeoutQueue.new
@pool = Set.new
@semaphore = Mutex.new
end
|
Instance Attribute Details
#pool ⇒ Object
Returns the value of attribute pool.
3
4
5
|
# File 'lib/threasy/work.rb', line 3
def pool
@pool
end
|
#queue ⇒ Object
Returns the value of attribute queue.
3
4
5
|
# File 'lib/threasy/work.rb', line 3
def queue
@queue
end
|
Instance Method Details
#add_worker(size) ⇒ Object
40
41
42
43
44
45
46
47
|
# File 'lib/threasy/work.rb', line 40
def add_worker(size)
log "Adding new worker to pool"
worker = Worker.new(self, size)
pool.add worker
worker.work
end
|
#check_workers ⇒ Object
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/threasy/work.rb', line 29
def check_workers
sync do
pool_size = pool.size
queue_size = queue.size
log "Checking workers. Pool: #{pool_size}, Max: #{max_workers}, Queue: #{queue_size}"
if pool_size < max_workers
add_worker(pool_size) if pool_size == 0 || queue_size > max_workers
end
end
end
|
#clear ⇒ Object
49
50
51
|
# File 'lib/threasy/work.rb', line 49
def clear
queue.clear
end
|
#enqueue(job = nil, &block) ⇒ Object
Also known as:
enqueue_block
11
12
13
|
# File 'lib/threasy/work.rb', line 11
def enqueue(job = nil, &block)
queue.push(block_given? ? block : job).tap{ check_workers }
end
|
#grab ⇒ Object
21
22
23
|
# File 'lib/threasy/work.rb', line 21
def grab
queue.pop
end
|
#log(msg) ⇒ Object
53
54
55
|
# File 'lib/threasy/work.rb', line 53
def log(msg)
Threasy.logger.debug msg
end
|
#max_workers ⇒ Object
25
26
27
|
# File 'lib/threasy/work.rb', line 25
def max_workers
Threasy.config.max_workers
end
|
#sync(&block) ⇒ Object
17
18
19
|
# File 'lib/threasy/work.rb', line 17
def sync(&block)
@semaphore.synchronize &block
end
|