Class: AWS::Flow::ForkingExecutor
- Inherits:
-
Object
- Object
- AWS::Flow::ForkingExecutor
- Defined in:
- lib/aws/decider/executor.rb
Class Attribute Summary collapse
-
.executors ⇒ Object
Returns the value of attribute executors.
Instance Attribute Summary collapse
-
#is_shutdown ⇒ Object
Returns the value of attribute is_shutdown.
-
#max_workers ⇒ Object
Returns the value of attribute max_workers.
-
#pids ⇒ Object
Returns the value of attribute pids.
Instance Method Summary collapse
- #block_on_max_workers ⇒ Object
- #execute(&block) ⇒ Object
-
#initialize(options = {}) ⇒ ForkingExecutor
constructor
A new instance of ForkingExecutor.
- #shutdown(timeout_seconds) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ ForkingExecutor
Returns a new instance of ForkingExecutor.
49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/aws/decider/executor.rb', line 49 def initialize( = {}) @log = [:logger] @log ||= Logger.new("#{Dir.tmpdir}/forking_log") @semaphore = Mutex.new log_level = [:log_level] || 4 @log.level = Logger::DEBUG @log.info("LOG INITIALIZED") @max_workers = [:max_workers] || 1 @pids = [] @is_shutdown = false ForkingExecutor.executors ||= [] ForkingExecutor.executors << self end |
Class Attribute Details
.executors ⇒ Object
Returns the value of attribute executors.
45 46 47 |
# File 'lib/aws/decider/executor.rb', line 45 def executors @executors end |
Instance Attribute Details
#is_shutdown ⇒ Object
Returns the value of attribute is_shutdown.
47 48 49 |
# File 'lib/aws/decider/executor.rb', line 47 def is_shutdown @is_shutdown end |
#max_workers ⇒ Object
Returns the value of attribute max_workers.
47 48 49 |
# File 'lib/aws/decider/executor.rb', line 47 def max_workers @max_workers end |
#pids ⇒ Object
Returns the value of attribute pids.
47 48 49 |
# File 'lib/aws/decider/executor.rb', line 47 def pids @pids end |
Instance Method Details
#block_on_max_workers ⇒ Object
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/aws/decider/executor.rb', line 117 def block_on_max_workers @log.debug "block_on_max_workers workers=#{@pids.size}, max_workers=#{@max_workers}" if @pids.size >= @max_workers @log.info "Reached maximum number of workers (#{@max_workers}), \ waiting for some to finish" begin remove_completed_pids(true) end while @pids.size >= @max_workers end end |
#execute(&block) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/aws/decider/executor.rb', line 63 def execute(&block) @log.info "Here are the pids that are currently running #{@pids}" raise RejectedExecutionException if @is_shutdown block_on_max_workers @log.debug "PARENT BEFORE FORK #{Process.pid}" child_pid = fork do begin @log.debug "CHILD #{Process.pid}" # TODO: which signals to ignore? # ignore signals in the child %w{ TERM INT HUP SIGUSR2 }.each { |signal| Signal.trap(signal, 'SIG_IGN') } block.call @log.debug "CHILD #{Process.pid} AFTER block.call" Process.exit!(0) rescue => e @log.error e @log.error "Definitely dying off right here" Process.exit!(1) end end @log.debug "PARENT AFTER FORK #{Process.pid}, child_pid=#{child_pid}" @pids << child_pid end |
#shutdown(timeout_seconds) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/aws/decider/executor.rb', line 87 def shutdown(timeout_seconds) @is_shutdown = true remove_completed_pids unless @pids.empty? @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish" # If the timeout_seconds value is set to Float::INFINITY, it will wait indefinitely till all workers finish # their work. This allows us to handle graceful shutdown of workers. if timeout_seconds == Float::INFINITY @log.info "Exit requested, waiting indefinitely till all child processes finish" remove_completed_pids true while !@pids.empty? else @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish" # check every second for child processes to finish timeout_seconds.times do sleep 1 remove_completed_pids break if @pids.empty? end end # forcibly kill all remaining children unless @pids.empty? @log.warn "Child processes still running, sending KILL signal: #{@pids.join(',')}" @pids.each { |pid| Process.kill('KILL', pid) } end end end |