Class: AWS::Flow::WorkflowTaskPoller

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/task_poller.rb

Instance Method Summary collapse

Constructor Details

#initialize(service, domain, handler, task_list, options = nil) ⇒ WorkflowTaskPoller

Creates a new ‘WorkflowTaskPoller`.

Parameters:

  • service

    The Amazon SWF service object on which this task poller will operate.

  • domain (String)

    The name of the domain containing the task lists to poll.

  • handler (DecisionTaskHandler)

    A DecisionTaskHandler to handle polled tasks. The poller will call the DecisionTaskHandler#handle_decision_task method.

  • task_list (Array)

    Specifies the task list to poll for decision tasks.

  • options (Object) (defaults to: nil)

    Options to use for the logger.



40
41
42
43
44
45
46
47
# File 'lib/aws/decider/task_poller.rb', line 40

def initialize(service, domain, handler, task_list, options=nil)
  @service = service
  @handler = handler
  @domain = domain
  @task_list = task_list
  @logger = options.logger if options
  @logger ||= Utilities::LogFactory.make_logger(self)
end

Instance Method Details

#get_decision_taskObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Retrieves any decision tasks that are ready.



51
52
53
# File 'lib/aws/decider/task_poller.rb', line 51

def get_decision_task
  @domain.decision_tasks.poll_for_single_task(@task_list)
end

#poll_and_process_single_taskObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/aws/decider/task_poller.rb', line 55

def poll_and_process_single_task
  # TODO waitIfSuspended
  begin
    @logger.debug "Polling for a new decision task of type #{@handler.workflow_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}"
    task = get_decision_task
    if task.nil?
      @logger.debug "Didn't get a task on task_list: #{@task_list}"
      return false
    end
    @logger.info Utilities.workflow_task_to_debug_string("Got decision task", task, @task_list)

    task_completed_request = @handler.handle_decision_task(task)
    @logger.debug "Response to the task will be #{task_completed_request.inspect}"

    if !task_completed_request[:decisions].empty? && (task_completed_request[:decisions].first.keys.include?(:fail_workflow_execution_decision_attributes))
      fail_hash = task_completed_request[:decisions].first[:fail_workflow_execution_decision_attributes]
      reason = fail_hash[:reason]
      details = fail_hash[:details]
    end

    begin
      @service.respond_decision_task_completed(task_completed_request)
    rescue AWS::SimpleWorkflow::Errors::ValidationException => e
      if e.message.include? "failed to satisfy constraint: Member must have length less than or equal to"
        # We want to ensure that the WorkflowWorker doesn't just sit around and
        # time the workflow out. If there is a validation failure possibly
        # because of large inputs to child workflows/activities or large custom
        # exceptions we should fail the workflow with some minimal details.
        task_completed_request[:decisions] = [
          {
            decision_type: "FailWorkflowExecution",
            fail_workflow_execution_decision_attributes: {
              reason: Utilities.validation_error_string("Workflow"),
              details: "AWS::SimpleWorkflow::Errors::ValidationException"
            }
          }
        ]
        @service.respond_decision_task_completed(task_completed_request)
      end
      @logger.error "#{task.workflow_type.inspect} failed with exception: #{e.inspect}"
    end
    @logger.info Utilities.workflow_task_to_debug_string("Finished executing task", task, @task_list)
  rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e
    @logger.error "Error in the poller, #{e.inspect}"
  rescue Exception => e
    @logger.error "Error in the poller, #{e.inspect}"
  end
end