Class: ProcessPool
- Inherits:
-
Object
show all
- Defined in:
- lib/process_pool.rb
Defined Under Namespace
Classes: EndTask, InvalidStateError
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(workers_count, queue = SimpleQueue.create, logger = SimpleLogger.new) ⇒ ProcessPool
Returns a new instance of ProcessPool.
10
11
12
13
14
15
16
17
|
# File 'lib/process_pool.rb', line 10
def initialize(workers_count, queue = SimpleQueue.create, logger = SimpleLogger.new)
self.state = :stopped
self.logger = logger
self.workers_count = workers_count
self.queue = queue
self.worker_pids = []
self.extensions = []
end
|
Instance Attribute Details
#workers_count ⇒ Object
Returns the value of attribute workers_count.
8
9
10
|
# File 'lib/process_pool.rb', line 8
def workers_count
@workers_count
end
|
Instance Method Details
#is_running? ⇒ Boolean
77
78
79
|
# File 'lib/process_pool.rb', line 77
def is_running?
return state == :running
end
|
#is_shutdown? ⇒ Boolean
85
86
87
|
# File 'lib/process_pool.rb', line 85
def is_shutdown?
return state == :shutdown
end
|
#is_stopped? ⇒ Boolean
81
82
83
|
# File 'lib/process_pool.rb', line 81
def is_stopped?
return state == :stopped
end
|
#register_extension(extension) ⇒ Object
25
26
27
28
29
30
31
32
33
|
# File 'lib/process_pool.rb', line 25
def register_extension(extension)
raise InvalidStateError.new('Can not register extensions once the pool is started.') unless is_stopped?
raise ArgumentError.new('extension can not be nil') unless extension
extension.process_pool = self if extension.respond_to?(:process_pool=)
extension.logger = logger if extension.respond_to?(:logger=)
extensions << extension
end
|
#schedule(job_class, *args) ⇒ Object
19
20
21
22
23
|
# File 'lib/process_pool.rb', line 19
def schedule(job_class, *args)
raise InvalidStateError.new('Can not add more jobs after shut down was called') if is_shutdown?
logger.debug("Scheduling task #{job_class}(#{args.collect{ |a| a.inspect }.join(', ')})")
push_task(job_class, args)
end
|
#shutdown ⇒ Object
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/process_pool.rb', line 61
def shutdown
raise InvalidStateError.new('Can not shut down pool that is not running') unless is_running?
logger.info("Shutting down process pool")
self.state = :shutdown
workers_count.times do
push_task(EndTask, [])
end
worker_pids.each do |pid|
Process.wait(pid)
end
queue.close
end
|
#start ⇒ Object
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/process_pool.rb', line 35
def start
raise InvalidStateError.new('Can not start a pool more than once') unless is_stopped?
logger.info("Starting process pool")
self.state = :running
workers_count.times do
pid = fork do
run_extensions(:startup)
at_exit { run_extensions(:shutdown) }
child_queue = get_child_queue()
while true
task_class, args = child_queue.pop
begin
task = get_task_class(task_class).new(*args)
run_extensions(:before, task) unless task.is_a?(EndTask)
result = task.run
run_extensions(:after, task, result)
rescue => e
logger.warn("Exception occurred while executing task #{task_class}(#{args}): #{e}")
end
end
end
self.worker_pids << pid
end
end
|