Class: AWS::Flow::ForkingExecutor Private
- Inherits:
-
Object
- Object
- AWS::Flow::ForkingExecutor
- Defined in:
- lib/aws/decider/executor.rb
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Class Attribute Summary collapse
- .executors ⇒ Object private
Instance Attribute Summary collapse
- #is_shutdown ⇒ Object private
- #max_workers ⇒ Object private
- #pids ⇒ Object private
Instance Method Summary collapse
- #block_on_max_workers ⇒ Object private
- #execute(&block) ⇒ Object private
-
#initialize(options = {}) ⇒ ForkingExecutor
constructor
private
A new instance of ForkingExecutor.
- #shutdown(timeout_seconds) ⇒ Object private
Constructor Details
#initialize(options = {}) ⇒ ForkingExecutor
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ForkingExecutor.
51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/aws/decider/executor.rb', line 51 def initialize( = {}) unless @log = [:logger] @log = Utilities::LogFactory.make_logger(self) end @semaphore = Mutex.new @max_workers = [:max_workers] || 1 @pids = [] @is_shutdown = false ForkingExecutor.executors ||= [] ForkingExecutor.executors << self end |
Class Attribute Details
.executors ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
47 48 49 |
# File 'lib/aws/decider/executor.rb', line 47 def executors @executors end |
Instance Attribute Details
#is_shutdown ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
49 50 51 |
# File 'lib/aws/decider/executor.rb', line 49 def is_shutdown @is_shutdown end |
#max_workers ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
49 50 51 |
# File 'lib/aws/decider/executor.rb', line 49 def max_workers @max_workers end |
#pids ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
49 50 51 |
# File 'lib/aws/decider/executor.rb', line 49 def pids @pids end |
Instance Method Details
#block_on_max_workers ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/aws/decider/executor.rb', line 118 def block_on_max_workers @log.debug "block_on_max_workers workers=#{@pids.size}, max_workers=#{@max_workers}" if @pids.size >= @max_workers @log.info "Reached maximum number of workers (#{@max_workers}), waiting for some to finish" begin remove_completed_pids(true) end while @pids.size >= @max_workers end @log.debug "Available workers: #{@max_workers - @pids.size} out of #{@max_workers}" end |
#execute(&block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/aws/decider/executor.rb', line 63 def execute(&block) @log.debug "Currently running pids: #{@pids}" raise RejectedExecutionException if @is_shutdown block_on_max_workers @log.debug "Creating a new child process: parent=#{Process.pid}" child_pid = fork do begin @log.debug "Inside the new child process: parent=#{Process.ppid}, child_pid=#{Process.pid}" # TODO: which signals to ignore? # ignore signals in the child %w{ TERM INT HUP SIGUSR2 }.each { |signal| Signal.trap(signal, 'SIG_IGN') } @log.debug "Executing block from child process: parent=#{Process.ppid}, child_pid=#{Process.pid}" block.call @log.debug "Exiting from child process: parent=#{Process.ppid}, child_pid=#{Process.pid}" Process.exit!(0) rescue => e @log.error "child_pid=#{Process.pid} failed while executing the task: #{e}. Exiting: parent=#{Process.ppid}, child_pid=#{Process.pid}" Process.exit!(1) end end @log.debug "Created a new child process: parent=#{Process.pid}, child_pid=#{child_pid}" @pids << child_pid end |
#shutdown(timeout_seconds) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/aws/decider/executor.rb', line 87 def shutdown(timeout_seconds) @log.debug "Shutdown requested. Currently running pids: #{@pids}" @is_shutdown = true remove_completed_pids unless @pids.empty? # If the timeout_seconds value is set to Float::INFINITY, it will wait # indefinitely till all workers finish their work. This allows us to # handle graceful shutdown of workers. if timeout_seconds == Float::INFINITY @log.info "Exit requested, waiting indefinitely till all child processes finish" remove_completed_pids true while !@pids.empty? else @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish" # check every second for child processes to finish timeout_seconds.times do sleep 1 remove_completed_pids break if @pids.empty? end end # forcibly kill all remaining children unless @pids.empty? @log.warn "Child processes #{@pids} still running, sending KILL signal: #{@pids.join(',')}" @pids.each { |pid| Process.kill('KILL', pid) } end end end |