Class: ProcessPool

Inherits:
Object
  • Object
show all
Defined in:
lib/process_pool.rb

Defined Under Namespace

Classes: EndTask, InvalidStateError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workers_count, queue = SimpleQueue.create, logger = SimpleLogger.new) ⇒ ProcessPool

Returns a new instance of ProcessPool.



8
9
10
11
12
13
14
# File 'lib/process_pool.rb', line 8

def initialize(workers_count, queue = SimpleQueue.create, logger = SimpleLogger.new)
  self.state = :stopped
  self.logger = logger
  self.workers_count = workers_count
  self.queue = queue
  self.worker_pids = []
end

Instance Attribute Details

#workers_countObject

Returns the value of attribute workers_count.



6
7
8
# File 'lib/process_pool.rb', line 6

def workers_count
  @workers_count
end

Instance Method Details

#is_running?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/process_pool.rb', line 58

def is_running?
  return state == :running
end

#is_shutdown?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/process_pool.rb', line 66

def is_shutdown?
  return state == :shutdown
end

#is_stopped?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/process_pool.rb', line 62

def is_stopped?
  return state == :stopped
end

#schedule(job_class, *args) ⇒ Object

Raises:



16
17
18
19
20
# File 'lib/process_pool.rb', line 16

def schedule(job_class, *args)
  raise InvalidStateError.new('Can not add more jobs after shut down was called') if is_shutdown?
  logger.debug("Scheduling task #{job_class}(#{args})")
  push_task(job_class, args)
end

#shutdownObject

Raises:



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/process_pool.rb', line 44

def shutdown
  raise InvalidStateError.new('Can not shut down pool that is not running') unless is_running?
  logger.info("Shutting down process pool")
  self.state = :shutdown

  workers_count.times do
    push_task(EndTask, [])
  end

  worker_pids.each do |pid|
    Process.wait(pid)
  end
end

#startObject

Raises:



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/process_pool.rb', line 22

def start
  raise InvalidStateError.new('Can not start a pool more than once') unless is_stopped?
  logger.info("Starting process pool")
  self.state = :running

  workers_count.times do
    pid = fork do
      child_queue = get_child_queue()
      while true
        task_class, args = child_queue.pop
        begin
          task = get_task_class(task_class).new(*args)
          task.run
        rescue => e
          logger.warn("Exception occurred while executing task #{task_class}(#{args}): #{e}")
        end
      end
    end
    self.worker_pids << pid
  end
end