Class: AWS::Flow::GenericActivityClient
- Inherits:
-
GenericClient
- Object
- GenericClient
- AWS::Flow::GenericActivityClient
- Defined in:
- lib/aws/decider/activity.rb
Overview
A generic activity client that can be used to perform standard activity actions.
Instance Attribute Summary collapse
-
#data_converter ⇒ Object
The data converter used for serializing/deserializing data when sending requests to and receiving results from workflow executions of this workflow type.
-
#decision_helper ⇒ Object
The decision helper used by the activity client.
-
#options ⇒ Object
A hash of ActivityRuntimeOptions for the activity client.
Attributes inherited from GenericClient
Class Method Summary collapse
-
.default_option_class ⇒ Object
Returns the default option class for the activity client, which is ActivityRuntimeOptions.
Instance Method Summary collapse
-
#activity_name_from_activity_type(name) ⇒ Object
Separates the activity name from the activity type at the point of the last (.) symbol.
-
#handle_activity_task_canceled(event) ⇒ Object
A handler for the ActivityClassCanceled event.
-
#handle_activity_task_completed(event) ⇒ Object
A handler for the ActivityClassCompleted event.
-
#handle_activity_task_failed(event) ⇒ Object
A handler for the ActivityTaskFailed event.
-
#handle_activity_task_timed_out(event) ⇒ Object
A handler for the ActivityClassTimedOut event.
-
#handle_schedule_activity_task_failed(event) ⇒ Object
A handler for the ScheduleActivityTaskFailed event.
-
#initialize(decision_helper, options) ⇒ GenericActivityClient
constructor
Creates a new GenericActivityClient instance.
-
#method_missing(method_name, *args, &block) ⇒ Object
Runs an activity given a name and block of options.
-
#request_cancel_activity_task(to_cancel) ⇒ Object
Requests that the activity is canceled.
-
#schedule_activity(name, activity_type, input, options) ⇒ Object
Schedules the named activity.
Methods inherited from GenericClient
#decision_context, #exponential_retry, #reconfigure, #retry, #send_async, #with_opts
Constructor Details
#initialize(decision_helper, options) ⇒ GenericActivityClient
Creates a new GenericActivityClient instance.
62 63 64 65 66 67 68 69 |
# File 'lib/aws/decider/activity.rb', line 62 def initialize(decision_helper, ) @decision_helper = decision_helper @options = @activity_option_map = @decision_helper. @failure_map = {} @data_converter ||= YAMLDataConverter.new super end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method_name, *args, &block) ⇒ Object
Runs an activity given a name and block of options.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/aws/decider/activity.rb', line 82 def method_missing(method_name, *args, &block) = Utilities::(ActivityOptions, block) = Utilities::(@options, @activity_option_map[method_name.to_sym], @option_map[method_name.to_sym], ) = ActivityOptions.new() activity_type = ActivityType.new("#{.prefix_name}.#{method_name.to_s}", .version, .) if ._exponential_retry retry_function = ._exponential_retry.retry_function || FlowConstants.exponential_retry_function ._exponential_retry.return_on_start ||= .return_on_start future = (lambda { self.schedule_activity(activity_type.name, activity_type, args, ) }, retry_function, ._exponential_retry, args) return future if .return_on_start result = Utilities::drill_on_future(future) else result = schedule_activity(activity_type.name, activity_type, args, ) end result end |
Instance Attribute Details
#data_converter ⇒ Object
The data converter used for serializing/deserializing data when sending requests to and receiving results from workflow executions of this workflow type. By default, this is YAMLDataConverter.
35 36 37 |
# File 'lib/aws/decider/activity.rb', line 35 def data_converter @data_converter end |
#decision_helper ⇒ Object
The decision helper used by the activity client.
38 39 40 |
# File 'lib/aws/decider/activity.rb', line 38 def decision_helper @decision_helper end |
#options ⇒ Object
A hash of ActivityRuntimeOptions for the activity client
41 42 43 |
# File 'lib/aws/decider/activity.rb', line 41 def @options end |
Class Method Details
.default_option_class ⇒ Object
Returns the default option class for the activity client, which is ActivityRuntimeOptions.
44 |
# File 'lib/aws/decider/activity.rb', line 44 def self.default_option_class; ActivityRuntimeOptions; end |
Instance Method Details
#activity_name_from_activity_type(name) ⇒ Object
Separates the activity name from the activity type at the point of the last (.) symbol.
51 52 53 |
# File 'lib/aws/decider/activity.rb', line 51 def activity_name_from_activity_type(name) return name.to_s.split(".").last.to_sym end |
#handle_activity_task_canceled(event) ⇒ Object
A handler for the ActivityClassCanceled event.
132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/aws/decider/activity.rb', line 132 def handle_activity_task_canceled(event) activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id]) @decision_helper[activity_id].consume(:handle_cancellation_event) if @decision_helper[activity_id].done? open_request = @decision_helper.scheduled_activities.delete(activity_id) exception = CancellationException.new("Cancelled from ActivityTaskCanceledEvent", nil) if ! open_request.nil? open_request.completion_handle.fail(exception) end end end |
#handle_activity_task_completed(event) ⇒ Object
A handler for the ActivityClassCompleted event.
204 205 206 207 208 209 210 211 212 213 |
# File 'lib/aws/decider/activity.rb', line 204 def handle_activity_task_completed(event) scheduled_id = event.attributes[:scheduled_event_id] activity_id = @decision_helper.activity_scheduling_event_id_to_activity_id[scheduled_id] @decision_helper[activity_id].consume(:handle_completion_event) if @decision_helper[activity_id].done? open_request = @decision_helper.scheduled_activities.delete(activity_id) open_request.result = event.attributes[:result] open_request.completion_handle.complete end end |
#handle_activity_task_failed(event) ⇒ Object
A handler for the ActivityTaskFailed event.
168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/aws/decider/activity.rb', line 168 def handle_activity_task_failed(event) attributes = event.attributes activity_id = @decision_helper.get_activity_id(attributes[:scheduled_event_id]) @decision_helper[activity_id].consume(:handle_completion_event) open_request_info = @decision_helper.scheduled_activities.delete(activity_id) reason = attributes[:reason] details = attributes[:details] # TODO consider adding user_context to open request, and adding it here # @decision_helper[@decision_helper.activity_scheduling_event_id_to_activity_id[event.attributes.scheduled_event_id]].attributes[:options].data_converter failure = ActivityTaskFailedException.new(event.id, activity_id, reason, details) open_request_info.completion_handle.fail(failure) end |
#handle_activity_task_timed_out(event) ⇒ Object
A handler for the ActivityClassTimedOut event.
149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/aws/decider/activity.rb', line 149 def handle_activity_task_timed_out(event) activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id]) activity_state_machine = @decision_helper[activity_id] activity_state_machine.consume(:handle_completion_event) if activity_state_machine.done? open_request = @decision_helper.scheduled_activities.delete(activity_id) if ! open_request.nil? timeout_type = event.attributes[:timeout_type] failure = ActivityTaskTimedOutException.new(event.id, activity_id, timeout_type, "Time out") open_request.completion_handle.fail(failure) end end end |
#handle_schedule_activity_task_failed(event) ⇒ Object
A handler for the ScheduleActivityTaskFailed event.
186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/aws/decider/activity.rb', line 186 def handle_schedule_activity_task_failed(event) attributes = event.attributes activity_id = attributes[:activity_id] open_request_info = @decision_helper.scheduled_activities.delete(activity_id) activity_state_machine = @decision_helper[activity_id] activity_state_machine.consume(:handle_initiation_failed_event) if activity_state_machine.done? # TODO Fail task correctly failure = ScheduleActivityTaskFailedException.new(event.id, event.attributes.activity_type, activity_id, event.attributes.cause) open_request_info.completion_handle.fail(failure) end end |
#request_cancel_activity_task(to_cancel) ⇒ Object
Requests that the activity is canceled.
119 120 121 122 123 124 125 |
# File 'lib/aws/decider/activity.rb', line 119 def request_cancel_activity_task(to_cancel) = to_cancel. if ! .respond_to? :activity_id raise "You need to use a future obtained from an activity" end @decision_helper[.activity_id].consume(:cancel) end |
#schedule_activity(name, activity_type, input, options) ⇒ Object
Schedules the named activity.
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/aws/decider/activity.rb', line 229 def schedule_activity(name, activity_type, input, ) = Utilities::(@option_map[activity_name_from_activity_type(name)], ) = ActivityOptions.new() output = Utilities::AddressableFuture.new open_request = OpenRequestInfo.new decision_id = @decision_helper.get_next_id(:Activity) output. = ActivityMetadata.new(decision_id) error_handler do |t| t.begin do @data_converter = .data_converter #input = input.map { |input_part| @data_converter.dump input_part } unless input.nil? input = @data_converter.dump input unless input.empty? attributes = {} .input ||= input unless input.empty? attributes[:options] = attributes[:activity_type] = activity_type attributes[:decision_id] = decision_id @completion_handle = nil external_task do |t| t.initiate_task do |handle| open_request.completion_handle = handle @decision_helper.scheduled_activities[decision_id.to_s] = open_request @decision_helper[decision_id.to_s] = ActivityDecisionStateMachine.new(decision_id, attributes) end t.cancellation_handler do |this_handle, cause| state_machine = @decision_helper[decision_id.to_s] if state_machine.current_state == :created open_request = @decision_helper.scheduled_activities.delete(decision_id.to_s) open_request.completion_handle.complete end state_machine.consume(:cancel) end end end t.rescue(Exception) do |error| @data_converter = .data_converter # If we have an ActivityTaskFailedException, then we should figure # out what the cause was, and pull that out. If it's anything else, # we should serialize the error, and stuff that into details, so # that things above us can pull it out correctly. We don't have to # do this for ActivityTaskFailedException, as the details is # *already* serialized if error.is_a? ActivityTaskFailedException details = @data_converter.load(error.details) error.cause = details else details = @data_converter.dump(error) error.details = details end @failure_map[decision_id.to_s] = error end t.ensure do @data_converter = .data_converter result = @data_converter.load open_request.result output.set(result) raise @failure_map[decision_id.to_s] if @failure_map[decision_id.to_s] && .return_on_start end end return output if .return_on_start output.get this_failure = @failure_map[decision_id.to_s] raise this_failure if this_failure return output.get end |