Class: AWS::Flow::GenericActivityClient

Inherits:
GenericClient show all
Defined in:
lib/aws/decider/activity.rb

Overview

A generic activity client that can be used to perform standard activity actions.

Instance Attribute Summary collapse

Attributes inherited from GenericClient

#option_map

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from GenericClient

#decision_context, #exponential_retry, #reconfigure, #retry, #send_async, #with_opts

Constructor Details

#initialize(decision_helper, options) ⇒ GenericActivityClient

Creates a new GenericActivityClient instance.

Parameters:

  • decision_helper (DecisionHelper)

    The decision helper to use for the activity client.

  • options (ActivityOptions)

    ActivityOptions to set for the activity client.



62
63
64
65
66
67
68
69
# File 'lib/aws/decider/activity.rb', line 62

def initialize(decision_helper, options)
  @decision_helper = decision_helper
  @options = options
  @activity_option_map = @decision_helper.activity_options
  @failure_map = {}
  @data_converter ||= YAMLDataConverter.new
  super
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_name, *args, &block) ⇒ Object

Runs an activity given a name and block of options.

Parameters:

  • method_name

    The name of the activity type to define

  • args

    Arguments for the method

  • block

    A block of ActivityOptions



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/aws/decider/activity.rb', line 82

def method_missing(method_name, *args, &block)
  options = Utilities::interpret_block_for_options(ActivityOptions, block)
  options = Utilities::merge_all_options(@options,
                              @activity_option_map[method_name.to_sym],
                              @option_map[method_name.to_sym],
                              options
                              )
  new_options = ActivityOptions.new(options)

  activity_type = ActivityType.new("#{new_options.prefix_name}.#{method_name.to_s}", new_options.version, new_options.get_default_options)
  if new_options._exponential_retry
    retry_function = new_options._exponential_retry.retry_function || FlowConstants.exponential_retry_function
    new_options._exponential_retry.return_on_start ||= new_options.return_on_start
    future = _retry_with_options(lambda { self.schedule_activity(activity_type.name, activity_type, args, new_options ) }, retry_function, new_options._exponential_retry, args)
    return future if new_options.return_on_start
    result = Utilities::drill_on_future(future)
  else
    result = schedule_activity(activity_type.name, activity_type, args, new_options)
  end
  result
end

Instance Attribute Details

#data_converterObject

The data converter used for serializing/deserializing data when sending requests to and receiving results from workflow executions of this workflow type. By default, this is YAMLDataConverter.



35
36
37
# File 'lib/aws/decider/activity.rb', line 35

def data_converter
  @data_converter
end

#decision_helperObject

The decision helper used by the activity client.



38
39
40
# File 'lib/aws/decider/activity.rb', line 38

def decision_helper
  @decision_helper
end

#optionsObject

A hash of ActivityRuntimeOptions for the activity client



41
42
43
# File 'lib/aws/decider/activity.rb', line 41

def options
  @options
end

Class Method Details

.default_option_classObject

Returns the default option class for the activity client, which is ActivityRuntimeOptions.



44
# File 'lib/aws/decider/activity.rb', line 44

def self.default_option_class; ActivityRuntimeOptions; end

Instance Method Details

#activity_name_from_activity_type(name) ⇒ Object

Separates the activity name from the activity type at the point of the last (.) symbol.

Parameters:

  • name (String)

    The name of the activity type.



51
52
53
# File 'lib/aws/decider/activity.rb', line 51

def activity_name_from_activity_type(name)
  return name.to_s.split(".").last.to_sym
end

#handle_activity_task_canceled(event) ⇒ Object

A handler for the ActivityClassCanceled event.

Parameters:

  • event (ActivityClassCanceled)

    The event data.



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/aws/decider/activity.rb', line 132

def handle_activity_task_canceled(event)
  activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id])
  @decision_helper[activity_id].consume(:handle_cancellation_event)
  if @decision_helper[activity_id].done?
    open_request = @decision_helper.scheduled_activities.delete(activity_id)
    exception = CancellationException.new("Cancelled from ActivityTaskCanceledEvent", nil)
    if ! open_request.nil?
      open_request.completion_handle.fail(exception)
    end
  end
end

#handle_activity_task_completed(event) ⇒ Object

A handler for the ActivityClassCompleted event.

Parameters:

  • event (ActivityClassCompleted)

    The event data.



207
208
209
210
211
212
213
214
215
216
# File 'lib/aws/decider/activity.rb', line 207

def handle_activity_task_completed(event)
  scheduled_id = event.attributes[:scheduled_event_id]
  activity_id = @decision_helper.activity_scheduling_event_id_to_activity_id[scheduled_id]
  @decision_helper[activity_id].consume(:handle_completion_event)
  if @decision_helper[activity_id].done?
    open_request = @decision_helper.scheduled_activities.delete(activity_id)
    open_request.result = event.attributes[:result]
    open_request.completion_handle.complete
  end
end

#handle_activity_task_failed(event) ⇒ Object

A handler for the ActivityTaskFailed event.

Parameters:

  • event (ActivityClassFailed)

    The event data.



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/aws/decider/activity.rb', line 168

def handle_activity_task_failed(event)
  attributes = event.attributes
  activity_id = @decision_helper.get_activity_id(attributes[:scheduled_event_id])
  @decision_helper[activity_id].consume(:handle_completion_event)
  open_request_info = @decision_helper.scheduled_activities.delete(activity_id)
  reason = attributes[:reason] if attributes.keys.include? :reason
  reason ||= "The activity which failed did not provide a reason"
  details = attributes[:details] if attributes.keys.include? :details
  details ||= "The activity which failed did not provide details"

  # TODO consider adding user_context to open request, and adding it here
  # @decision_helper[@decision_helper.activity_scheduling_event_id_to_activity_id[event.attributes.scheduled_event_id]].attributes[:options].data_converter
  failure = ActivityTaskFailedException.new(event.id, activity_id, reason, details)
  open_request_info.completion_handle.fail(failure)
end

#handle_activity_task_timed_out(event) ⇒ Object

A handler for the ActivityClassTimedOut event.

Parameters:

  • event (ActivityClassTimedOut)

    The event data.



149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/aws/decider/activity.rb', line 149

def handle_activity_task_timed_out(event)
  activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id])
  activity_state_machine = @decision_helper[activity_id]
  activity_state_machine.consume(:handle_completion_event)
  if activity_state_machine.done?
    open_request = @decision_helper.scheduled_activities.delete(activity_id)
    if ! open_request.nil?
      timeout_type = event.attributes[:timeout_type]
      failure = ActivityTaskTimedOutException.new(event.id, activity_id, timeout_type, "Time out")
      open_request.completion_handle.fail(failure)
    end
  end
end

#handle_schedule_activity_task_failed(event) ⇒ Object

A handler for the ScheduleActivityTaskFailed event.

Parameters:

  • event (ScheduleActivityTaskFailed)

    The event data.



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/aws/decider/activity.rb', line 189

def handle_schedule_activity_task_failed(event)
  attributes = event.attributes
  activity_id = attributes[:activity_id]
  open_request_info = @decision_helper.scheduled_activities.delete(activity_id)
  activity_state_machine = @decision_helper[activity_id]
  activity_state_machine.consume(:handle_initiation_failed_event)
  if activity_state_machine.done?
    # TODO Fail task correctly
    failure = ScheduleActivityTaskFailedException.new(event.id, event.attributes.activity_type, activity_id, event.attributes.cause)
    open_request_info.completion_handle.fail(failure)
  end
end

#request_cancel_activity_task(to_cancel) ⇒ Object

Requests that the activity is canceled.

Parameters:

  • to_cancel (Future)

    The Future for the task to be canceled.



119
120
121
122
123
124
125
# File 'lib/aws/decider/activity.rb', line 119

def request_cancel_activity_task(to_cancel)
   = to_cancel.
  if ! .respond_to? :activity_id
    raise "You need to use a future obtained from an activity"
  end
  @decision_helper[.activity_id].consume(:cancel)
end

#schedule_activity(name, activity_type, input, options) ⇒ Object

Schedules the named activity.

Parameters:

  • name (String)

    The name of the activity to schedule.

  • activity_type (String)

    The activity type for this scheduled activity.

  • input (Object)

    Optional data passed to the activity.

  • options (ActivityOptions)

    ActivityOptions to set for the scheduled activity.



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/aws/decider/activity.rb', line 232

def schedule_activity(name, activity_type, input, options)
  options = Utilities::merge_all_options(@option_map[activity_name_from_activity_type(name)], options)
  new_options = ActivityOptions.new(options)
  output = Utilities::AddressableFuture.new
  open_request = OpenRequestInfo.new
  decision_id = @decision_helper.get_next_id(:Activity)
  output. = ActivityMetadata.new(decision_id)
  error_handler do |t|
    t.begin do
      @data_converter = new_options.data_converter

      #input = input.map { |input_part| @data_converter.dump input_part } unless input.nil?
      input = @data_converter.dump input unless input.empty?
      attributes = {}
      new_options.input ||= input unless input.empty?
      attributes[:options] = new_options
      attributes[:activity_type] = activity_type
      attributes[:decision_id] = decision_id
      @completion_handle = nil
      external_task do |t|
        t.initiate_task do |handle|
          open_request.completion_handle = handle

          @decision_helper.scheduled_activities[decision_id.to_s] = open_request
          @decision_helper[decision_id.to_s] = ActivityDecisionStateMachine.new(decision_id, attributes)
        end
        t.cancellation_handler do |this_handle, cause|
          state_machine = @decision_helper[decision_id.to_s]
          if state_machine.current_state == :created
            open_request = @decision_helper.scheduled_activities.delete(decision_id.to_s)
            open_request.completion_handle.complete
          end
          state_machine.consume(:cancel)
        end
      end
    end
    t.rescue(Exception) do |error|
      @data_converter = new_options.data_converter
      # If we have an ActivityTaskFailedException, then we should figure
      # out what the cause was, and pull that out. If it's anything else,
      # we should serialize the error, and stuff that into details, so
      # that things above us can pull it out correctly. We don't have to
      # do this for ActivityTaskFailedException, as the details is
      # *already* serialized
      if error.is_a? ActivityTaskFailedException
        details = @data_converter.load(error.details)
        error.cause = details
      else
        details = @data_converter.dump(error)
        error.details = details
      end
      @failure_map[decision_id.to_s] = error
    end
    t.ensure do
      @data_converter = new_options.data_converter
      result = @data_converter.load open_request.result
      output.set(result)
      raise @failure_map[decision_id.to_s] if @failure_map[decision_id.to_s] && new_options.return_on_start
    end
  end
  return output if new_options.return_on_start
  output.get
  this_failure = @failure_map[decision_id.to_s]
  raise this_failure if this_failure
  return output.get
end