Class: AWS::Flow::ForkingExecutor Private

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/executor.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ForkingExecutor

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ForkingExecutor.



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/aws/decider/executor.rb', line 51

def initialize(options = {})
  unless @log = options[:logger]
    @log = Utilities::LogFactory.make_logger(self)
  end
  @semaphore = Mutex.new
  @max_workers = options[:max_workers] || 1
  @pids = []
  @is_shutdown = false
  ForkingExecutor.executors ||= []
  ForkingExecutor.executors << self
end

Class Attribute Details

.executorsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



47
48
49
# File 'lib/aws/decider/executor.rb', line 47

def executors
  @executors
end

Instance Attribute Details

#is_shutdownObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



49
50
51
# File 'lib/aws/decider/executor.rb', line 49

def is_shutdown
  @is_shutdown
end

#max_workersObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



49
50
51
# File 'lib/aws/decider/executor.rb', line 49

def max_workers
  @max_workers
end

#pidsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



49
50
51
# File 'lib/aws/decider/executor.rb', line 49

def pids
  @pids
end

Instance Method Details

#block_on_max_workersObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



118
119
120
121
122
123
124
125
126
127
# File 'lib/aws/decider/executor.rb', line 118

def block_on_max_workers
  @log.debug "block_on_max_workers workers=#{@pids.size}, max_workers=#{@max_workers}"
  if @pids.size >= @max_workers
    @log.info "Reached maximum number of workers (#{@max_workers}), waiting for some to finish"
    begin
      remove_completed_pids(true)
    end while @pids.size >= @max_workers
  end
  @log.debug "Available workers: #{@max_workers - @pids.size} out of #{@max_workers}"
end

#execute(&block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/aws/decider/executor.rb', line 63

def execute(&block)
  @log.debug "Currently running pids: #{@pids}"
  raise RejectedExecutionException if @is_shutdown
  block_on_max_workers
  @log.debug "Creating a new child process: parent=#{Process.pid}"
  child_pid = fork do
    begin
      @log.debug "Inside the new child process: parent=#{Process.ppid}, child_pid=#{Process.pid}"
      # TODO: which signals to ignore?
      # ignore signals in the child
      %w{ TERM INT HUP SIGUSR2 }.each { |signal| Signal.trap(signal, 'SIG_IGN') }
      @log.debug "Executing block from child process: parent=#{Process.ppid}, child_pid=#{Process.pid}"
      block.call
      @log.debug "Exiting from child process: parent=#{Process.ppid}, child_pid=#{Process.pid}"
      Process.exit!(0)
    rescue => e
      @log.error "child_pid=#{Process.pid} failed while executing the task: #{e}. Exiting: parent=#{Process.ppid}, child_pid=#{Process.pid}"
      Process.exit!(1)
    end
  end
  @log.debug "Created a new child process: parent=#{Process.pid}, child_pid=#{child_pid}"
  @pids << child_pid
end

#shutdown(timeout_seconds) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/aws/decider/executor.rb', line 87

def shutdown(timeout_seconds)
  @log.debug "Shutdown requested. Currently running pids: #{@pids}"
  @is_shutdown = true
  remove_completed_pids

  unless @pids.empty?
    # If the timeout_seconds value is set to Float::INFINITY, it will wait
    # indefinitely till all workers finish their work. This allows us to
    # handle graceful shutdown of workers.
    if timeout_seconds == Float::INFINITY
      @log.info "Exit requested, waiting indefinitely till all child processes finish"
      remove_completed_pids true while !@pids.empty?
    else
      @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish"
      # check every second for child processes to finish
      timeout_seconds.times do
        sleep 1
        remove_completed_pids
        break if @pids.empty?
      end
    end

    # forcibly kill all remaining children
    unless @pids.empty?
      @log.warn "Child processes #{@pids} still running, sending KILL signal: #{@pids.join(',')}"
      @pids.each { |pid| Process.kill('KILL', pid) }
    end
  end
end