Class: AWS::Flow::AsyncRetryingExecutor Private

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/async_retrying_executor.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

internal AsyncRetryingExecutor class

Instance Method Summary collapse

Constructor Details

#initialize(retrying_policy, clock, execution_id, return_on_start = false) ⇒ AsyncRetryingExecutor

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of AsyncRetryingExecutor.



23
24
25
26
27
28
# File 'lib/aws/decider/async_retrying_executor.rb', line 23

def initialize(retrying_policy, clock, execution_id, return_on_start = false)
  @retrying_policy = retrying_policy
  @clock = clock
  @return_on_start = return_on_start
  @execution_id = execution_id
end

Instance Method Details

#execute(command, options = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raises:

  • (@error_seen)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/aws/decider/async_retrying_executor.rb', line 31

def execute(command, options = nil)
  return schedule_with_retry(command, nil, Hash.new { |hash, key| hash[key] = 1 }, @clock.current_time, nil) if @return_on_start
  output = Utilities::AddressableFuture.new
  result_lock = Utilities::AddressableFuture.new
  error_handler do |t|
    t.begin do
      output.set(schedule_with_retry(command, nil, Hash.new { |hash, key| hash[key] = 1 }, @clock.current_time, nil))
    end
    t.rescue(Exception) do |error|
      @error_seen = error
    end
    t.ensure do
      output.set unless output.set?
      result_lock.set
    end
  end
  result_lock.get
  raise @error_seen if @error_seen
  output
end

#invoke(command, attempts, first_attempt_time) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/aws/decider/async_retrying_executor.rb', line 70

def invoke(command, attempts, first_attempt_time)
  failure_to_retry = nil
  should_retry = Future.new
  return_value = Future.new
  output = Utilities::AddressableFuture.new
  error_handler do |t|
    t.begin { return_value.set(command.call) }
    t.rescue(Exception) do |error|
      failure_to_retry = error
      raise error if error.class <= CancellationException
    end
    t.ensure { should_retry.set(failure_to_retry) }
  end
  task do
    failure = should_retry.get
    if ! failure.nil?
      attempts[failure.class] += 1
      output.set(schedule_with_retry(command, failure, attempts, first_attempt_time, @clock.current_time))
    else
      output.set(return_value.get)
    end
    #to_return = return_value.set? ? return_value.get : nil
  end
  return output if @return_on_start
  output.get
end

#schedule_with_retry(command, failure, attempts, first_attempt_time, time_of_recorded_failure) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/aws/decider/async_retrying_executor.rb', line 53

def schedule_with_retry(command, failure, attempts, first_attempt_time, time_of_recorded_failure)
  delay = -1
  if attempts.values.reduce(0, :+) > 1
    raise failure unless @retrying_policy.isRetryable(failure)
    delay = @retrying_policy.next_retry_delay_seconds(first_attempt_time, time_of_recorded_failure, attempts, failure, @execution_id)
    raise failure if delay < 0
  end
  if delay > 0
    task do
      @clock.create_timer(delay, lambda { invoke(command, attempts, first_attempt_time) })
    end
  else
    invoke(command, attempts, first_attempt_time)
  end
end