Class: AWS::Flow::AsyncRetryingExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/async_retrying_executor.rb

Instance Method Summary collapse

Constructor Details

#initialize(retrying_policy, clock, execution_id, return_on_start = false) ⇒ AsyncRetryingExecutor

Returns a new instance of AsyncRetryingExecutor.



20
21
22
23
24
25
# File 'lib/aws/decider/async_retrying_executor.rb', line 20

def initialize(retrying_policy, clock, execution_id, return_on_start = false)
  @retrying_policy = retrying_policy
  @clock = clock
  @return_on_start = return_on_start
  @execution_id = execution_id
end

Instance Method Details

#execute(command, options = nil) ⇒ Object

Raises:

  • (@error_seen)


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/aws/decider/async_retrying_executor.rb', line 26

def execute(command, options = nil)
  return schedule_with_retry(command, nil, Hash.new { |hash, key| hash[key] = 1 }, @clock.current_time, nil) if @return_on_start
  output = Utilities::AddressableFuture.new
  result_lock = Utilities::AddressableFuture.new
  error_handler do |t|
    t.begin do
      output.set(schedule_with_retry(command, nil, Hash.new { |hash, key| hash[key] = 1 }, @clock.current_time, nil))
    end
    t.rescue(Exception) do |error|
      @error_seen = error
    end
    t.ensure do
      output.set unless output.set?
      result_lock.set
    end
  end
  result_lock.get
  raise @error_seen if @error_seen
  output
end

#invoke(command, attempts, first_attempt_time) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/aws/decider/async_retrying_executor.rb', line 63

def invoke(command, attempts, first_attempt_time)
  failure_to_retry = nil
  should_retry = Future.new
  return_value = Future.new
  output = Utilities::AddressableFuture.new
  error_handler do |t|
    t.begin { return_value.set(command.call) }
    t.rescue(Exception) do |error|
      failure_to_retry = error
      raise error if error.class <= CancellationException
    end
    t.ensure { should_retry.set(failure_to_retry) }
  end
  task do
    failure = should_retry.get
    if ! failure.nil?
      attempts[failure.class] += 1
      output.set(schedule_with_retry(command, failure, attempts, first_attempt_time, @clock.current_time))
    else
      output.set(return_value.get)
    end
    #to_return = return_value.set? ? return_value.get : nil
  end
  return output if @return_on_start
  output.get
end

#schedule_with_retry(command, failure, attempts, first_attempt_time, time_of_recorded_failure) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/aws/decider/async_retrying_executor.rb', line 47

def schedule_with_retry(command, failure, attempts, first_attempt_time, time_of_recorded_failure)
  delay = -1
  if attempts.values.reduce(0, :+) > 1
    raise failure unless @retrying_policy.isRetryable(failure)
    delay = @retrying_policy.next_retry_delay_seconds(first_attempt_time, time_of_recorded_failure, attempts, failure, @execution_id)
    raise failure if delay < 0
  end
  if delay > 0
    task do
      @clock.create_timer(delay, lambda { invoke(command, attempts, first_attempt_time) })
    end
  else
    invoke(command, attempts, first_attempt_time)
  end
end