Module: AWS::Flow::Runner
- Includes:
- AWS::Flow
- Defined in:
- lib/aws/runner.rb
Overview
The Runner is a command-line utility that can spawn workflow and activity workers according to a specification that you provide in a [JSON](json.org/) configuration file. It is available beginning with version 1.3.0 of the AWS Flow Framework for Ruby.
## Invoking the Runner
It is invoked like so:
aws-flow-ruby -f runspec.json
Where runspec.json represents a local JSON file that specifies how to run your activities and workflows.
## The Runner Specification File
The runner is configured by passing it a JSON-formatted configuration file. Here is a minimal example, providing only the required fields:
{
"domain": { "name": "ExampleDomain" },
"workflow_workers": [
{
"task_list": "example_workflow_tasklist"
}
],
"activity_workers": [
{
"task_list": "example_activity_tasklist"
}
],
}
## For More Information
For a complete description of the runner’s specification, with examples of both configuring and using the runner, see [The Runner](docs.aws.amazon.com/amazonswf/latest/awsrbflowguide/the-runner.html) in the *AWS Flow Framework for Ruby Developer Guide*.
Class Method Summary collapse
-
.add_dir_to_load_path(path) ⇒ Object
private
Extends the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers.
-
.all_subclasses(clazz) ⇒ Object
private
Searches the object space for all subclasses of ‘clazz`.
- .create_service_client(json_config) ⇒ Object private
-
.expand_task_list(value) ⇒ Object
private
Used to support host-specific task lists.
-
.get_classes(json_fragment, what) ⇒ Object
private
Gets the classes to run.
- .is_empty_field?(json_fragment, field_name) ⇒ Boolean private
-
.load_classes(config_path, json_config) ⇒ Object
private
Loads activity and workflow classes.
-
.load_config_json(path) ⇒ Object
private
Loads the runner specification from a JSON file (passed in with the ‘–file` parameter when run from the shell).
-
.load_files(config_path, json_config, what) ⇒ Object
private
Runs the necessary “require” commands to load the code needed to run a module.
-
.main ⇒ Object
private
Invoked from the shell.
-
.parse_command_line(argv = ARGV) ⇒ Object
private
Interprets the command-line paramters pased in from the shell.
-
.run(worker_spec) ⇒ Object
Invoked from code.
- .set_process_name(name) ⇒ Object private
- .set_user_agent(json_config) ⇒ Object private
-
.setup_domain(json_config) ⇒ Object
private
Registers the domain if it is not already registered.
-
.setup_signal_handling(workers) ⇒ Object
private
Sets up forwarding of signals to child processes to facilitate and support orderly shutdown.
-
.spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object
private
Spawns the workers.
-
.start_activity_workers(swf, domain = nil, json_config) ⇒ Object
private
Starts the activity workers.
-
.start_default_workers(swf, domain = nil, json_config) ⇒ Object
private
Starts workflow workers for the default workflow type ‘FlowDefaultWorkflowRuby’.
-
.start_workers(domain = nil, json_config) ⇒ Object
private
Starts the workers and returns an array of process IDs (pids) for the worker processes.
-
.start_workflow_workers(swf, domain = nil, json_config) ⇒ Object
private
Starts the workflow workers.
-
.wait_for_child_processes(workers) ⇒ Object
private
Waits until all the child workers are finished.
Methods included from AWS::Flow
decision_context, on_windows?, start, start_workflow, version, with_retry, #workflow_client, workflow_client, #workflow_factory
Class Method Details
.add_dir_to_load_path(path) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Extends the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers.
361 362 363 364 |
# File 'lib/aws/runner.rb', line 361 def self.add_dir_to_load_path(path) raise ArgumentError.new("Invalid directory path: \"" + path.to_s + "\"") if not FileTest.directory? path $LOAD_PATH.unshift path.to_s end |
.all_subclasses(clazz) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Searches the object space for all subclasses of ‘clazz`.
75 76 77 |
# File 'lib/aws/runner.rb', line 75 def self.all_subclasses(clazz) ObjectSpace.each_object(Class).select { |klass| klass.is_a? clazz } end |
.create_service_client(json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
297 298 299 300 |
# File 'lib/aws/runner.rb', line 297 def self.create_service_client(json_config) set_user_agent(json_config) swf = AWS::SimpleWorkflow.new end |
.expand_task_list(value) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Used to support host-specific task lists. When the string “|hostname|” is found in the task list it is replaced by the actual host name.
124 125 126 127 128 129 |
# File 'lib/aws/runner.rb', line 124 def self.(value) raise ArgumentError.new unless value ret = value ret.gsub!("|hostname|", Socket.gethostname) ret end |
.get_classes(json_fragment, what) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Gets the classes to run.
This method extracts and validates the ‘activity_classes’ and ‘workflow_classes’ fields from the runner specification file, or by autodiscovery of subclasses of [AWS::Flow::Activities]] and
- AWS::Flow::Workflows
-
in the object space.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/aws/runner.rb', line 88 def self.get_classes(json_fragment, what) classes = json_fragment[what[:config_key]] if classes.nil? || classes.empty? then # discover the classes classes = all_subclasses( what[:clazz] ) else # constantize the class names we just read from the config classes.map! { |c| Object.const_get(c) } end if classes.nil? || classes.empty? then raise ArgumentError.new "need at least one implementation class" end classes end |
.is_empty_field?(json_fragment, field_name) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
132 133 134 135 |
# File 'lib/aws/runner.rb', line 132 def self.is_empty_field?(json_fragment, field_name) field = json_fragment[field_name] field.nil? || field.empty? end |
.load_classes(config_path, json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Loads activity and workflow classes
config_path: the path where the config file is, to be able to
resolve relative references
json_config: the content of the config
327 328 329 330 331 332 333 334 |
# File 'lib/aws/runner.rb', line 327 def self.load_classes(config_path, json_config) # load all classes for the activities load_files(config_path, json_config, {config_key: 'activity_paths', default_file: File.join('flow', 'activities.rb')}) # load all the classes for the workflows load_files(config_path, json_config, {config_key: 'workflow_paths', default_file: File.join('flow', 'workflows.rb')}) end |
.load_config_json(path) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Loads the runner specification from a JSON file (passed in with the ‘–file` parameter when run from the shell).
371 372 373 374 |
# File 'lib/aws/runner.rb', line 371 def self.load_config_json(path) raise ArgumentError.new("Invalid file path: \"" + path.to_s + "\"") if not File.file? path config = JSON.parse(File.open(path) { |f| f.read }) end |
.load_files(config_path, json_config, what) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Runs the necessary “require” commands to load the code needed to run a module.
config_path: the path where the config file is, to be able to
resolve relative references
json_config: the content of the config
what: what should be loaded. This is a hash expected to contain two keys:
- :default_file : the file to load unless a specific list is provided
- :config_key : the key of the config element which can contain a
specific list of files to load
153 154 155 156 157 158 159 160 |
# File 'lib/aws/runner.rb', line 153 def self.load_files(config_path, json_config, what) if is_empty_field?(json_config, what[:config_key]) then file = File.join(File.dirname(config_path), what[:default_file]) require file if File.exists? file else json_config[what[:config_key]].each { |file| require file if File.exists? file } end end |
.main ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Invoked from the shell.
418 419 420 421 422 423 424 425 |
# File 'lib/aws/runner.rb', line 418 def self.main = parse_command_line config_path = [:file] worker_spec = load_config_json(config_path) add_dir_to_load_path(Pathname.new(config_path).dirname) load_classes(config_path, worker_spec) run(worker_spec) end |
.parse_command_line(argv = ARGV) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Interprets the command-line paramters pased in from the shell.
The parameter ‘–file` (short: `-f`) is required, and must provide the path to the runner configuration file.
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 |
# File 'lib/aws/runner.rb', line 382 def self.parse_command_line(argv = ARGV) = {} optparse = OptionParser.new do |opts| opts.on('-f', '--file JSON_CONFIG_FILE', "Mandatory JSON config file") do |f| [:file] = f end end optparse.parse!(argv) # The `--file` parameter is not optional. raise OptionParser::MissingArgument.new("file") if [:file].nil? return end |
.run(worker_spec) ⇒ Object
Invoked from code. This is a helper method that can be used to start the runner from code. This is especially helpful for debugging purposes.
worker_spec: Hash representation of the json worker spec
405 406 407 408 409 410 411 412 |
# File 'lib/aws/runner.rb', line 405 def self.run(worker_spec) workers = start_workers(worker_spec) setup_signal_handling(workers) # Hang there until killed: this process is used to relay signals to # children to support and facilitate an orderly shutdown. wait_for_child_processes(workers) end |
.set_process_name(name) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
68 69 70 |
# File 'lib/aws/runner.rb', line 68 def self.set_process_name(name) $0 = name end |
.set_user_agent(json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
289 290 291 292 293 294 |
# File 'lib/aws/runner.rb', line 289 def self.set_user_agent(json_config) # set the UserAgent prefix for all clients if json_config['user_agent_prefix'] then AWS.config(user_agent_prefix: json_config['user_agent_prefix']) end end |
.setup_domain(json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers the domain if it is not already registered.
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/aws/runner.rb', line 53 def self.setup_domain(json_config) set_user_agent(json_config) # If domain is not provided, use the default ruby flow domain domain = json_config['domain'] || { 'name' => FlowConstants.defaults[:domain] } # If retention period is not provided, default it to 7 days retention = domain['retention_in_days'] || FlowConstants::RETENTION_DEFAULT AWS::Flow::Utilities.register_domain(domain['name'], retention.to_s) end |
.setup_signal_handling(workers) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets up forwarding of signals to child processes to facilitate and support orderly shutdown.
340 341 342 |
# File 'lib/aws/runner.rb', line 340 def self.setup_signal_handling(workers) Signal.trap("INT") { workers.each { |w| Process.kill("INT", w) } } end |
.spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Spawns the workers.
106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/aws/runner.rb', line 106 def self.spawn_and_start_workers(json_fragment, process_name, worker) workers = [] num_of_workers = json_fragment['number_of_workers'] || FlowConstants::NUM_OF_WORKERS_DEFAULT should_register = true num_of_workers.times do workers << fork do set_process_name(process_name) worker.start(should_register) end should_register = false end workers end |
.start_activity_workers(swf, domain = nil, json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts the activity workers.
The activities run by the workers consist of each class that extends
- AWS::Flow::Activities
-
in the paths provided in the ‘activity_paths`
section of [the runner specification file][], or that are loaded from ‘require` statements in the `workflows.rb` file.
If the ‘activity’ classes are regular ruby classes, this method will create a proxy AWS::Flow::Activities class for each regular ruby class loaded and add the proxy implementation to the ActivityWorker.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/aws/runner.rb', line 174 def self.start_activity_workers(swf, domain = nil, json_config) workers = [] domain = setup_domain(json_config) if domain.nil? # This will be used later to start default workflow workers. If the # 'activity_workers' and 'default_workers' keys are not specified in the # json spec, then we don't start any default workers. Hence this value # is defaulted to 0. number_of_default_workers = 0 # TODO: logger # start the workers for each spec if json_config['activity_workers'] json_config['activity_workers'].each do |w| # If number of forks is not provided, it will automatically default to 20 # within the ActivityWorker fork_count = w['number_of_forks_per_worker'] task_list = (w['task_list']) if w['task_list'] # Get activity classes classes = get_classes(w, {config_key: 'activity_classes', clazz: AWS::Flow::Activities}) # If task_list is not provided, use the name of the first class as the # task_list for this worker task_list ||= "#{classes.first}" # Create a worker worker = ActivityWorker.new(swf.client, domain, task_list) {{ execution_workers: fork_count }} classes.each do |c| c = AWS::Flow::Templates.make_activity_class(c) unless c.is_a?(AWS::Flow::Activities) worker.add_implementation(c) end # We add 1 default worker for each activity worker. number_of_default_workers += w['number_of_workers'] || FlowConstants::NUM_OF_WORKERS_DEFAULT # start as many workers as desired in child processes workers << spawn_and_start_workers(w, "activity-worker", worker) end # Create the config for default workers if it's not passed in the # json_config if json_config['default_workers'].nil? || json_config['default_workers']['number_of_workers'].nil? json_config['default_workers'] = { 'number_of_workers' => number_of_default_workers } end end # Start the default workflow workers workers << start_default_workers(swf, domain, json_config) return workers end |
.start_default_workers(swf, domain = nil, json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts workflow workers for the default workflow type ‘FlowDefaultWorkflowRuby’. If ‘default_workers’ key is not set in the json spec, we set the number of workers equal to the number of activity workers Default workers are used to for processing workflow templates
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/aws/runner.rb', line 237 def self.start_default_workers(swf, domain = nil, json_config) workers = [] domain = setup_domain(json_config) if domain.nil? if json_config['default_workers'] # Also register the default result activity type in the given domain AWS::Flow::Templates::Utils.register_default_result_activity(domain) klass = AWS::Flow::Templates.default_workflow task_list = FlowConstants.defaults[:task_list] # Create a worker worker = WorkflowWorker.new(swf.client, domain, task_list, klass) # This will take care of both registering and starting the default workers workers << spawn_and_start_workers(json_config['default_workers'], "default-worker", worker) end workers end |
.start_workers(domain = nil, json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts the workers and returns an array of process IDs (pids) for the worker processes.
306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/aws/runner.rb', line 306 def self.start_workers(domain = nil, json_config) workers = [] swf = create_service_client(json_config) workers << start_activity_workers(swf, domain, json_config) workers << start_workflow_workers(swf, domain, json_config) # needed to avoid returning nested arrays based on the calls above workers.flatten! end |
.start_workflow_workers(swf, domain = nil, json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts the workflow workers.
The workflows run by the workers consist of each class that extends
- AWS::Flow::Workflows
-
in the paths provided in the ‘workflow_paths`
section of [the runner specification file][], or that are loaded from ‘require` statements in the `workflows.rb` file.
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/aws/runner.rb', line 263 def self.start_workflow_workers(swf, domain = nil, json_config) workers = [] domain = setup_domain(json_config) if domain.nil? # TODO: logger # start the workers for each spec if json_config['workflow_workers'] json_config['workflow_workers'].each do |w| task_list = (w['task_list']) # Get workflow classes classes = get_classes(w, {config_key: 'workflow_classes', clazz: AWS::Flow::Workflows}) # Create a worker worker = WorkflowWorker.new(swf.client, domain, task_list, *classes) # Start as many workers as desired in child processes workers << spawn_and_start_workers(w, "workflow-worker", worker) end end return workers end |
.wait_for_child_processes(workers) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Waits until all the child workers are finished.
TODO: use a logger
348 349 350 351 352 353 354 |
# File 'lib/aws/runner.rb', line 348 def self.wait_for_child_processes(workers) until workers.empty? puts "waiting on workers " + workers.to_s + " to complete" dead_guys = Process.waitall dead_guys.each { |pid, status| workers.delete(pid); puts pid.to_s + " exited" } end end |