Class: ProcessPool
- Inherits:
-
Object
show all
- Defined in:
- lib/process_pool.rb
Defined Under Namespace
Classes: EndTask, InvalidStateError
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(workers_count, queue = SimpleQueue.create, logger = SimpleLogger.new) ⇒ ProcessPool
Returns a new instance of ProcessPool.
8
9
10
11
12
13
14
|
# File 'lib/process_pool.rb', line 8
def initialize(workers_count, queue = SimpleQueue.create, logger = SimpleLogger.new)
self.state = :stopped
self.logger = logger
self.workers_count = workers_count
self.queue = queue
self.worker_pids = []
end
|
Instance Attribute Details
#workers_count ⇒ Object
Returns the value of attribute workers_count.
6
7
8
|
# File 'lib/process_pool.rb', line 6
def workers_count
@workers_count
end
|
Instance Method Details
#is_running? ⇒ Boolean
58
59
60
|
# File 'lib/process_pool.rb', line 58
def is_running?
return state == :running
end
|
#is_shutdown? ⇒ Boolean
66
67
68
|
# File 'lib/process_pool.rb', line 66
def is_shutdown?
return state == :shutdown
end
|
#is_stopped? ⇒ Boolean
62
63
64
|
# File 'lib/process_pool.rb', line 62
def is_stopped?
return state == :stopped
end
|
#schedule(job_class, *args) ⇒ Object
16
17
18
19
20
|
# File 'lib/process_pool.rb', line 16
def schedule(job_class, *args)
raise InvalidStateError.new('Can not add more jobs after shut down was called') if is_shutdown?
logger.debug("Scheduling task #{job_class}(#{args})")
push_task(job_class, args)
end
|
#shutdown ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/process_pool.rb', line 44
def shutdown
raise InvalidStateError.new('Can not shut down pool that is not running') unless is_running?
logger.info("Shutting down process pool")
self.state = :shutdown
workers_count.times do
push_task(EndTask, [])
end
worker_pids.each do |pid|
Process.wait(pid)
end
end
|
#start ⇒ Object
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/process_pool.rb', line 22
def start
raise InvalidStateError.new('Can not start a pool more than once') unless is_stopped?
logger.info("Starting process pool")
self.state = :running
workers_count.times do
pid = fork do
child_queue = get_child_queue()
while true
task_class, args = child_queue.pop
begin
task = get_task_class(task_class).new(*args)
task.run
rescue => e
logger.warn("Exception occurred while executing task #{task_class}(#{args}): #{e}")
end
end
end
self.worker_pids << pid
end
end
|