Class: ProcessPool

Inherits:
Object
  • Object
show all
Defined in:
lib/process_pool.rb

Defined Under Namespace

Classes: EndTask, InvalidStateError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workers_count, queue = SimpleQueue.create, logger = SimpleLogger.new) ⇒ ProcessPool

Returns a new instance of ProcessPool.



10
11
12
13
14
15
16
17
# File 'lib/process_pool.rb', line 10

def initialize(workers_count, queue = SimpleQueue.create, logger = SimpleLogger.new)
  self.state = :stopped
  self.logger = logger
  self.workers_count = workers_count
  self.queue = queue
  self.worker_pids = []
  self.extensions = []
end

Instance Attribute Details

#workers_countObject

Returns the value of attribute workers_count.



8
9
10
# File 'lib/process_pool.rb', line 8

def workers_count
  @workers_count
end

Instance Method Details

#is_running?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/process_pool.rb', line 77

def is_running?
  return state == :running
end

#is_shutdown?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/process_pool.rb', line 85

def is_shutdown?
  return state == :shutdown
end

#is_stopped?Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/process_pool.rb', line 81

def is_stopped?
  return state == :stopped
end

#register_extension(extension) ⇒ Object

Raises:



25
26
27
28
29
30
31
32
33
# File 'lib/process_pool.rb', line 25

def register_extension(extension)
  raise InvalidStateError.new('Can not register extensions once the pool is started.') unless is_stopped?
  raise ArgumentError.new('extension can not be nil') unless extension

  extension.process_pool = self if extension.respond_to?(:process_pool=)
  extension.logger = logger if extension.respond_to?(:logger=)

  extensions << extension
end

#schedule(job_class, *args) ⇒ Object

Raises:



19
20
21
22
23
# File 'lib/process_pool.rb', line 19

def schedule(job_class, *args)
  raise InvalidStateError.new('Can not add more jobs after shut down was called') if is_shutdown?
  logger.debug("Scheduling task #{job_class}(#{args.collect{ |a| a.inspect }.join(', ')})")
  push_task(job_class, args)
end

#shutdownObject

Raises:



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/process_pool.rb', line 61

def shutdown
  raise InvalidStateError.new('Can not shut down pool that is not running') unless is_running?
  logger.info("Shutting down process pool")
  self.state = :shutdown

  workers_count.times do
    push_task(EndTask, [])
  end

  worker_pids.each do |pid|
    Process.wait(pid)
  end

  queue.close
end

#startObject

Raises:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/process_pool.rb', line 35

def start
  raise InvalidStateError.new('Can not start a pool more than once') unless is_stopped?
  logger.info("Starting process pool")
  self.state = :running

  workers_count.times do
    pid = fork do
      run_extensions(:startup)
      at_exit { run_extensions(:shutdown) }
      child_queue = get_child_queue()
      while true
        task_class, args = child_queue.pop
        begin
          task = get_task_class(task_class).new(*args)
          run_extensions(:before, task) unless task.is_a?(EndTask)
          result = task.run
          run_extensions(:after, task, result)
        rescue => e
          logger.warn("Exception occurred while executing task #{task_class}(#{args}): #{e}")
        end
      end
    end
    self.worker_pids << pid
  end
end