Class: AWS::Flow::AsyncDecider

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/async_decider.rb

Overview

The asynchronous decider class

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflow_definition_factory, history_helper, decision_helper) ⇒ AsyncDecider

Creates a new asynchronous decider



194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/aws/decider/async_decider.rb', line 194

def initialize(workflow_definition_factory, history_helper, decision_helper)
  @workflow_definition_factory = workflow_definition_factory
  @history_helper = history_helper
  @decision_helper = decision_helper
  @decision_task = history_helper.get_decision_task
  @workflow_clock = WorkflowClock.new(@decision_helper)

  @workflow_context = WorkflowContext.new(@decision_task, @workflow_clock)
  @activity_client = GenericActivityClient.new(@decision_helper, nil)
  @workflow_client = GenericWorkflowClient.new(@decision_helper, @workflow_context)
  @decision_context = DecisionContext.new(@activity_client, @workflow_client, @workflow_clock, @workflow_context, @decision_helper)
end

Instance Attribute Details

#decision_helperObject

Returns the value of attribute decision_helper.



191
192
193
# File 'lib/aws/decider/async_decider.rb', line 191

def decision_helper
  @decision_helper
end

#task_tokenObject

Returns the value of attribute task_token.



191
192
193
# File 'lib/aws/decider/async_decider.rb', line 191

def task_token
  @task_token
end

Instance Method Details

#complete_workflowObject

Registers a ‘CompleteWorkflowExecution` decision.



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/aws/decider/async_decider.rb', line 338

def complete_workflow
  return unless @completed && ! @unhandled_decision
  decision_id = [:SELF, nil]
  if @failure
    @decision_helper[decision_id] = make_fail_decision(decision_id, @failure)
  elsif @cancel_requested
        @decision_helper[decision_id] = make_cancel_decision(decision_id)
  else

    if ! @workflow_context.continue_as_new_options.nil?
      @decision_helper[decision_id] = continue_as_new_workflow(decision_id, @workflow_context.continue_as_new_options)
    else
      if @result.nil?
        @decision_helper[decision_id] = make_completion_decision(decision_id, {
                                                                   :decision_type => "CompleteWorkflowExecution"})
      else
        @decision_helper[decision_id] = make_completion_decision(decision_id, {
                                                                   :decision_type => "CompleteWorkflowExecution",
                                                                   :complete_workflow_execution_decision_attributes => {:result => @result.get}})
      end
    end
  end
end

#completed?true, false

Is the task completed?

Returns:

  • (true, false)

    Returns ‘true` if the task is completed; `false` otherwise.



367
368
369
# File 'lib/aws/decider/async_decider.rb', line 367

def completed?
  @completed
end

#continue_as_new_workflow(decision_id, continue_as_new_options) ⇒ Object

Continues this as a new workflow, using the provided decision and options.

Parameters:

  • decision_id (DecisionID)

    The decision id to use.

  • continue_as_new_options (WorkflowOptions)

    The options to use for the new workflow.



322
323
324
325
326
327
328
329
330
331
# File 'lib/aws/decider/async_decider.rb', line 322

def continue_as_new_workflow(decision_id, continue_as_new_options)
  result = {
    :decision_type => "ContinueAsNewWorkflowExecution",
  }

  task_list = continue_as_new_options.task_list ? {:task_list => {:name => continue_as_new_options.task_list}} : {}
  to_add = continue_as_new_options.get_options([:execution_start_to_close_timeout, :task_start_to_close_timeout, :child_policy, :tag_list, :workflow_type_version, :input], task_list)
  result[:continue_as_new_workflow_execution_decision_attributes] = to_add
  CompleteWorkflowStateMachine.new(decision_id, result)
end

#handle_activity_task_cancel_requested(event) ⇒ Object

Handler for the ‘:ActivityTaskCancelRequested` event.

Parameters:

  • event (Object)

    The event to process



446
447
448
449
# File 'lib/aws/decider/async_decider.rb', line 446

def handle_activity_task_cancel_requested(event)
  activity_id = event.attributes[:activity_id]
  @decision_helper[activity_id].consume(:handle_cancellation_initiated_event)
end

#handle_activity_task_scheduled(event) ⇒ Object

Handler for the ‘:ActivityTaskScheduled` event.

Parameters:

  • event (Object)

    The event to process



376
377
378
379
380
381
# File 'lib/aws/decider/async_decider.rb', line 376

def handle_activity_task_scheduled(event)
  activity_id = event.attributes[:activity_id]
  @decision_helper.activity_scheduling_event_id_to_activity_id[event.id] = activity_id
  @decision_helper[activity_id].consume(:handle_initiated_event)
  return @decision_helper[activity_id].done?
end

#handle_cancel_timer_failed(event) ⇒ Object

Handler for the ‘:CancelTimerFailed` event.

Parameters:

  • event (Object)

    The event to process



584
585
586
587
588
589
# File 'lib/aws/decider/async_decider.rb', line 584

def handle_cancel_timer_failed(event)
  handle_event(event, {
                 :id_methods => [:timer_id],
                 :consume_symbol => :handle_cancellation_failure_event
               })
end

#handle_cancel_workflow_execution_failed(event) ⇒ Object

Handler for the ‘:CancelWorkflowExecutionFailed` event.

Parameters:

  • event (Object)

    The event to process



487
488
489
# File 'lib/aws/decider/async_decider.rb', line 487

def handle_cancel_workflow_execution_failed(event)
  handle_closing_failure
end

#handle_closing_failureObject



461
462
463
464
# File 'lib/aws/decider/async_decider.rb', line 461

def handle_closing_failure
  @unhandled_decision = true
  @decision_helper[[:SELF, nil]].consume(:handle_initiation_failed_event)
end

#handle_complete_workflow_execution_failed(event) ⇒ Object

Handler for the ‘:CompleteWorkflowExecutionFailed` event.

Parameters:

  • event (Object)

    The event to process



469
470
471
# File 'lib/aws/decider/async_decider.rb', line 469

def handle_complete_workflow_execution_failed(event)
  handle_closing_failure
end

#handle_continue_as_new_workflow_execution_failed(event) ⇒ Object

Handler for the ‘:ContinueAsNewWorkflowExecutionFailed’ event.

Parameters:

  • event (Object)

    The event to process



496
497
498
# File 'lib/aws/decider/async_decider.rb', line 496

def handle_continue_as_new_workflow_execution_failed(event)
  handle_closing_failure
end

#handle_fail_workflow_execution_failed(event) ⇒ Object

Handler for the ‘:FailWorkflowExecutionFailed` event.

Parameters:

  • event (Object)

    The event to process



478
479
480
# File 'lib/aws/decider/async_decider.rb', line 478

def handle_fail_workflow_execution_failed(event)
  handle_closing_failure
end

#handle_request_cancel_activity_task_failed(event) ⇒ Object

Handler for the ‘:RequestCancelActivityTaskFailed` event.

Parameters:

  • event (Object)

    The event to process



454
455
456
457
458
459
# File 'lib/aws/decider/async_decider.rb', line 454

def handle_request_cancel_activity_task_failed(event)
  handle_event(event, {
                 :id_methods => [:activity_id],
                 :consume_symbol => :handle_cancellation_failure_event
               })
end

#handle_request_cancel_external_workflow_execution_failed(event) ⇒ Object

Handler for the ‘:RequestCancelExternalWorkflowExecutionFailed` event.

Parameters:

  • event (Object)

    The event to process



560
561
562
563
564
565
# File 'lib/aws/decider/async_decider.rb', line 560

def handle_request_cancel_external_workflow_execution_failed(event)
  handle_event(event, {
                 :id_methods => [:workflow_id],
                 :consume_symbol => :handle_cancellation_failure_event
               })
end

#handle_request_cancel_external_workflow_execution_initiated(event) ⇒ Object

Handler for the ‘:RequestCancelExternalWorkflowExecutionInitiated` event.

Parameters:

  • event (Object)

    The event to process



548
549
550
551
552
553
# File 'lib/aws/decider/async_decider.rb', line 548

def handle_request_cancel_external_workflow_execution_initiated(event)
  handle_event(event, {
                 :id_methods => [:workflow_id],
                 :consume_symbol => :handle_cancellation_initiated_event
               })
end

#handle_signal_external_workflow_execution_initiated(event) ⇒ Object

Handler for the ‘:SignalExternalWorkflowExecutionInitiated` event.

Parameters:

  • event (Object)

    The event to process



536
537
538
539
540
541
# File 'lib/aws/decider/async_decider.rb', line 536

def handle_signal_external_workflow_execution_initiated(event)
  signal_id = event.attributes[:control]
  @decision_helper.signal_initiated_event_to_signal_id[event.id] = signal_id
  @decision_helper[signal_id].consume(:handle_initiated_event)
  @decision_helper[signal_id].done?
end

#handle_start_child_workflow_execution_initiated(event) ⇒ Object

Handler for the ‘:StartChildWorkflowExecutionInitiated` event.

Parameters:

  • event (Object)

    The event to process



572
573
574
575
576
577
# File 'lib/aws/decider/async_decider.rb', line 572

def handle_start_child_workflow_execution_initiated(event)
  workflow_id = event.attributes[:workflow_id]
  @decision_helper.child_initiated_event_id_to_workflow_id[event.id] = workflow_id
  @decision_helper[workflow_id].consume(:handle_initiated_event)
  @decision_helper[workflow_id].done?
end

#handle_start_timer_failed(event) ⇒ Object

Handler for the ‘:StartTimerFailed` event.

Parameters:

  • event (Object)

    The event to process



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/aws/decider/async_decider.rb', line 418

def handle_start_timer_failed(event)
  timer_id = event.attributes.timer_id
  return if timer_id == DecisionHelper.force_immediate_decision_timer
  handle_event(event, {
                 :id_methods => [:timer_id],
                 :consume_symbol => :handle_completion_event,
                 :decision_helper_scheduled => :scheduled_timers,
                 :handle_open_request => lambda do |event, open_request|
                   exception = StartTimerFailedException(event.id, timer_id, nil, event.attributes.cause)
                   open_request.completion_handle.fail(exception)
                 end
               })
  state_machine = @decision_helper[timer_id]


end

#handle_timer_canceled(event) ⇒ Object

Handler for the ‘:TimerCanceled` event.

Parameters:

  • event (Object)

    The event to process



517
518
519
520
521
522
523
524
525
526
527
528
529
# File 'lib/aws/decider/async_decider.rb', line 517

def handle_timer_canceled(event)
  handle_event(event, {
                 :id_methods => [:timer_id],
                 :consume_symbol => :handle_cancellation_event,
                 :decision_helper_scheduled => :scheduled_timers,
                 :handle_open_request => lambda do |event, open_request|
                   if ! open_request.nil?
                     cancellation_exception = CancellationException.new("Cancelled from a Timer Cancelled event")
                     open_request.completion_handle.fail(cancellation_exception)
                   end
                 end
               })
end

#handle_timer_fired(event) ⇒ Object

Handler for the ‘:TimerFired` event.

Parameters:

  • event (Object)

    The event to process



402
403
404
405
406
407
408
409
410
411
# File 'lib/aws/decider/async_decider.rb', line 402

def handle_timer_fired(event)
  timer_id = event.attributes[:timer_id]
  return if timer_id == DecisionHelper.force_immediate_decision_timer
  @decision_helper[timer_id].consume(:handle_completion_event)
  if @decision_helper[timer_id].done?
    open_request = @decision_helper.scheduled_timers.delete(timer_id)
    open_request.blocking_promise.set(nil)
    open_request.completion_handle.complete
  end
end

#handle_timer_started(event) ⇒ Object

Handler for the ‘:TimerStarted` event.

Parameters:

  • event (Object)

    The event to process



505
506
507
508
509
510
# File 'lib/aws/decider/async_decider.rb', line 505

def handle_timer_started(event)
  timer_id = event.attributes[:timer_id]
  return if timer_id == DecisionHelper.force_immediate_decision_timer
  @decision_helper[timer_id].consume(:handle_initiated_event)
  @decision_helper[timer_id].done?
end

#handle_workflow_execution_cancel_requested(event) ⇒ Object

Handler for the ‘:WorkflowExecutionCancelRequested` event.

Parameters:

  • event (Object)

    The event to process



438
439
440
441
# File 'lib/aws/decider/async_decider.rb', line 438

def handle_workflow_execution_cancel_requested(event)
  @workflow_async_scope.cancel(CancellationException.new("Cancelled from a WorkflowExecutionCancelRequested"))
  @cancel_requested = true
end

#handle_workflow_execution_signaled(event) ⇒ Object

Handler for the ‘:WorkflowExecutionSignaled` event.

Parameters:

  • event (Object)

    The event to process



596
597
598
599
600
601
602
603
604
605
606
# File 'lib/aws/decider/async_decider.rb', line 596

def handle_workflow_execution_signaled(event)
  signal_name = event.attributes[:signal_name]
  input = event.attributes[:input] if event.attributes.keys.include? :input
  input ||= NoInput.new
  # TODO do stuff if we are @completed
  t = Task.new(nil) do
    @definition.signal_received(signal_name, input)
  end
  task_context = TaskContext.new(:parent => @workflow_async_scope.get_closest_containing_scope, :task => t)
  @workflow_async_scope.get_closest_containing_scope << t
end

#handle_workflow_execution_started(event) ⇒ Object

Handler for the ‘:WorkflowExecutionStarted` event.

Parameters:

  • event (Object)

    The event to process



388
389
390
391
392
393
394
395
# File 'lib/aws/decider/async_decider.rb', line 388

def handle_workflow_execution_started(event)
  @workflow_async_scope = AsyncScope.new do
    FlowFiber.current[:decision_context] = @decision_context
    input = (event.attributes.keys.include? :input) ?  event.attributes[:input] : nil
    @definition = @workflow_definition_factory.get_workflow_definition(@decision_context)
    @result = @definition.execute(input)
  end
end

#make_cancel_decision(decision_id) ⇒ Object



311
312
313
# File 'lib/aws/decider/async_decider.rb', line 311

def make_cancel_decision(decision_id)
  CompleteWorkflowStateMachine.new(decision_id, {:decision_type => "CancelWorkflowExecution"})
end

#make_completion_decision(decision_id, decision) ⇒ Object



308
309
310
# File 'lib/aws/decider/async_decider.rb', line 308

def make_completion_decision(decision_id, decision)
  CompleteWorkflowStateMachine.new(decision_id, decision)
end

#make_fail_decision(decision_id, failure) ⇒ Object

failing in this execution. This information can be useful for tracing the sequence of events back from the

failure.

Parameters:

  • failure (Exception)

    The exception that is associated with the failed workflow.

See Also:



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/aws/decider/async_decider.rb', line 279

def make_fail_decision(decision_id, failure)
  decision_type = "FailWorkflowExecution"

  # Sizes taken from
  # http://docs.aws.amazon.com/amazonswf/latest/apireference/API_FailWorkflowExecutionDecisionAttributes.html
  #reason = failure.reason if (failure.respond_to? :reason)
  max_response_size = 32768
  truncation_overhead = 8000
  reason ||= failure.message.slice(0, 255)
  detail_size = max_response_size - truncation_overhead

  # If you don't have details, you must be some other type of
  # exception. We can't do anything exceedingly clever, so lets just get
  # the stack trace and pop that out
  details = failure.details if (failure.respond_to? :details)
  details ||= failure.backtrace.join("")
  new_details = details[0..(max_response_size - truncation_overhead)]
  if details.length > (max_response_size - truncation_overhead)
    new_details += "->->->->->THIS BACKTRACE WAS TRUNCATED"
  end
  # details.unshift(reason)
  # details = details.join("\n")

  fail_workflow_execution_decision_attributes = {:reason => reason, :details => new_details}
  decision = {:decision_type => decision_type, :fail_workflow_execution_decision_attributes => fail_workflow_execution_decision_attributes}
  CompleteWorkflowStateMachine.new(decision_id, decision)

end

#process_event(event) ⇒ Object

Processes decider events

Parameters:

  • event (Object)

    The event to process



613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
# File 'lib/aws/decider/async_decider.rb', line 613

def process_event(event)
  event_type_symbol = event.event_type.to_sym
  # Mangle the name so that it is handle_ + the name of the event type in snakecase
  handle_event = "handle_" + event.event_type.gsub(/(.)([A-Z])/,'\1_\2').downcase
  noop_set = Set.new([:DecisionTaskScheduled, :DecisionTaskCompleted,
  :DecisionTaskStarted, :DecisionTaskTimedOut, :WorkflowExecutionTimedOut,
  :WorkflowExecutionTerminated, :MarkerRecorded,
  :WorkflowExecutionCompleted, :WorkflowExecutionFailed,
  :WorkflowExecutionCanceled, :WorkflowExecutionContinuedAsNew, :ActivityTaskStarted])

  return if noop_set.member? event_type_symbol

  self_set = Set.new([:TimerFired, :StartTimerFailed,
  :WorkflowExecutionCancel, :ActivityTaskScheduled,
  :WorkflowExecutionCancelRequested,
  :ActivityTaskCancelRequested, :RequestCancelActivityTaskFailed,
  :CompleteWorkflowExecutionFailed, :FailWorkflowExecutionFailed,
  :CancelWorkflowExecutionFailed, :ContinueAsNewWorkflowExecutionFailed,
  :TimerStarted, :TimerCanceled,
  :SignalExternalWorkflowExecutionInitiated,
  :RequestCancelExternalWorkflowExecutionInitiated,
  :RequestCancelExternalWorkflowExecutionFailed,
  :StartChildWorkflowExecutionInitiated, :CancelTimerFailed, :WorkflowExecutionStarted, :WorkflowExecutionSignaled])

  activity_client_set = Set.new([:ActivityTaskCompleted,
  :ActivityTaskCanceled, :ActivityTaskTimedOut,
  :ScheduleActivityTaskFailed, :ActivityTaskFailed])

  workflow_client_set =
  Set.new([:ExternalWorkflowExecutionCancelRequested,
  :ChildWorkflowExecutionCanceled, :ChildWorkflowExecutionCompleted,
  :ChildWorkflowExecutionFailed,
  :ChildWorkflowExecutionStarted, :ChildWorkflowExecutionTerminated,
  :ChildWorkflowExecutionTimedOut, :ExternalWorkflowExecutionSignaled,
  :SignalExternalWorkflowExecutionFailed,
  :StartChildWorkflowExecutionFailed])

  event_set_to_object_mapping = { self_set => self,
    activity_client_set => @activity_client,
    workflow_client_set => @workflow_client }
  thing_to_operate_on = event_set_to_object_mapping.map {|key, value|
    value if key.member? event_type_symbol }.compact.first
  thing_to_operate_on.send(handle_event, event)
    # DecisionTaskStarted is taken care of at TODO
end