Class: AWS::Flow::ForkingExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/executor.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ForkingExecutor

Returns a new instance of ForkingExecutor.



49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/aws/decider/executor.rb', line 49

def initialize(options = {})
  @log = options[:logger]
  @log ||= Logger.new("#{Dir.tmpdir}/forking_log")
  @semaphore = Mutex.new
  log_level = options[:log_level] || 4
  @log.level = Logger::DEBUG
  @log.info("LOG INITIALIZED")
  @max_workers = options[:max_workers] || 1
  @pids = []
  @is_shutdown = false
  ForkingExecutor.executors ||= []
  ForkingExecutor.executors << self
end

Class Attribute Details

.executorsObject

Returns the value of attribute executors.



45
46
47
# File 'lib/aws/decider/executor.rb', line 45

def executors
  @executors
end

Instance Attribute Details

#is_shutdownObject

Returns the value of attribute is_shutdown.



47
48
49
# File 'lib/aws/decider/executor.rb', line 47

def is_shutdown
  @is_shutdown
end

#max_workersObject

Returns the value of attribute max_workers.



47
48
49
# File 'lib/aws/decider/executor.rb', line 47

def max_workers
  @max_workers
end

#pidsObject

Returns the value of attribute pids.



47
48
49
# File 'lib/aws/decider/executor.rb', line 47

def pids
  @pids
end

Instance Method Details

#block_on_max_workersObject



117
118
119
120
121
122
123
124
125
126
# File 'lib/aws/decider/executor.rb', line 117

def block_on_max_workers
  @log.debug "block_on_max_workers workers=#{@pids.size}, max_workers=#{@max_workers}"
  if @pids.size >= @max_workers
    @log.info "Reached maximum number of workers (#{@max_workers}), \
               waiting for some to finish"
    begin
      remove_completed_pids(true)
    end while @pids.size >= @max_workers
  end
end

#execute(&block) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/aws/decider/executor.rb', line 63

def execute(&block)
  @log.info "Here are the pids that are currently running #{@pids}"
  raise RejectedExecutionException if @is_shutdown
  block_on_max_workers
  @log.debug "PARENT BEFORE FORK #{Process.pid}"
  child_pid = fork do
    begin
      @log.debug "CHILD #{Process.pid}"
      # TODO: which signals to ignore?
      # ignore signals in the child
      %w{ TERM INT HUP SIGUSR2 }.each { |signal| Signal.trap(signal, 'SIG_IGN') }
      block.call
      @log.debug "CHILD #{Process.pid} AFTER block.call"
      Process.exit!(0)
    rescue => e
      @log.error e
      @log.error "Definitely dying off right here"
      Process.exit!(1)
    end
  end
  @log.debug "PARENT AFTER FORK #{Process.pid}, child_pid=#{child_pid}"
  @pids << child_pid
end

#shutdown(timeout_seconds) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/aws/decider/executor.rb', line 87

def shutdown(timeout_seconds)
  @is_shutdown = true
  remove_completed_pids

  unless @pids.empty?
    @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish"

    # If the timeout_seconds value is set to Float::INFINITY, it will wait indefinitely till all workers finish
    # their work. This allows us to handle graceful shutdown of workers.
    if timeout_seconds == Float::INFINITY
      @log.info "Exit requested, waiting indefinitely till all child processes finish"
      remove_completed_pids true while !@pids.empty?
    else
      @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish"
      # check every second for child processes to finish
      timeout_seconds.times do
        sleep 1
        remove_completed_pids
        break if @pids.empty?
      end
    end

    # forcibly kill all remaining children
    unless @pids.empty?
      @log.warn "Child processes still running, sending KILL signal: #{@pids.join(',')}"
      @pids.each { |pid| Process.kill('KILL', pid) }
    end
  end
end