Module: AWS::Flow::Runner
- Includes:
- AWS::Flow
- Defined in:
- lib/aws/runner.rb
Overview
The Runner is a command-line utility that can spawn workflow and activity workers according to a specification that you provide in a [JSON](json.org/) configuration file. It is available beginning with version 1.3.0 of the AWS Flow Framework for Ruby.
## Invoking the Runner
It is invoked like so:
aws-flow-ruby -f runspec.json
Where runspec.json represents a local JSON file that specifies how to run your activities and workflows.
## The Runner Specification File
The runner is configured by passing it a JSON-formatted configuration file. Here is a minimal example, providing only the required fields:
{
"domain": { "name": "ExampleDomain" },
"workflow_workers": [
{
"task_list": "example_workflow_tasklist"
}
],
"activity_workers": [
{
"task_list": "example_activity_tasklist"
}
],
}
## For More Information
For a complete description of the runner’s specification, with examples of both configuring and using the runner, see [The Runner](docs.aws.amazon.com/amazonswf/latest/awsrbflowguide/the-runner.html) in the *AWS Flow Framework for Ruby Developer Guide*.
Class Method Summary collapse
-
.add_dir_to_load_path(path) ⇒ Object
private
Extends the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers.
-
.add_implementations(worker, json_fragment, what) ⇒ Object
private
Used to add implementations to workers; see [get_classes] for more information.
-
.all_subclasses(clazz) ⇒ Object
private
Searches the object space for all subclasses of ‘clazz`.
- .create_service_client(json_config) ⇒ Object private
-
.expand_task_list(value) ⇒ Object
private
Used to support host-specific task lists.
-
.get_classes(json_fragment, what) ⇒ Object
private
Gets the classes to run.
- .is_empty_field?(json_fragment, field_name) ⇒ Boolean private
-
.load_config_json(path) ⇒ Object
private
Loads the runner specification from a JSON file (passed in with the ‘–file` parameter when run from the shell).
-
.load_files(config_path, json_config, what) ⇒ Object
private
Runs the necessary “require” commands to load the code needed to run a module.
-
.main ⇒ Object
private
Invoked from the shell.
-
.parse_command_line(argv = ARGV) ⇒ Object
private
Interprets the command-line paramters pased in from the shell.
- .set_process_name(name) ⇒ Object private
-
.setup_domain(json_config) ⇒ Object
private
Registers the domain if it is not already registered.
-
.setup_signal_handling(workers) ⇒ Object
private
Sets up forwarding of signals to child processes to facilitate and support orderly shutdown.
-
.spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object
private
Spawns the workers.
-
.start_activity_workers(swf, domain = nil, config_path, json_config) ⇒ Object
private
Starts the activity workers.
-
.start_workers(domain = nil, config_path, json_config) ⇒ Object
private
Starts the workers and returns an array of process IDs (pids) for the worker processes.
-
.start_workflow_workers(swf, domain = nil, config_path, json_config) ⇒ Object
private
Starts the workflow workers.
-
.wait_for_child_processes(workers) ⇒ Object
private
Waits until all the child workers are finished.
Methods included from AWS::Flow
decision_context, version, with_retry, #workflow_client, workflow_client, #workflow_factory
Class Method Details
.add_dir_to_load_path(path) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Extends the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers.
291 292 293 294 |
# File 'lib/aws/runner.rb', line 291 def self.add_dir_to_load_path(path) raise ArgumentError.new("Invalid directory path: \"" + path.to_s + "\"") if not FileTest.directory? path $LOAD_PATH.unshift path.to_s end |
.add_implementations(worker, json_fragment, what) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Used to add implementations to workers; see [get_classes] for more information.
111 112 113 114 |
# File 'lib/aws/runner.rb', line 111 def self.add_implementations(worker, json_fragment, what) classes = get_classes(json_fragment, what) classes.each { |c| worker.add_implementation(c) } end |
.all_subclasses(clazz) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Searches the object space for all subclasses of ‘clazz`.
79 80 81 |
# File 'lib/aws/runner.rb', line 79 def self.all_subclasses(clazz) ObjectSpace.each_object(Class).select { |klass| klass.is_a? clazz } end |
.create_service_client(json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
241 242 243 244 245 246 247 248 |
# File 'lib/aws/runner.rb', line 241 def self.create_service_client(json_config) # set the UserAgent prefix for all clients if json_config['user_agent_prefix'] then AWS.config(user_agent_prefix: json_config['user_agent_prefix']) end swf = AWS::SimpleWorkflow.new end |
.expand_task_list(value) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Used to support host-specific task lists. When the string “|hostname|” is found in the task list it is replaced by the actual host name.
135 136 137 138 139 140 |
# File 'lib/aws/runner.rb', line 135 def self.(value) raise ArgumentError.new unless value ret = value ret.gsub!("|hostname|", Socket.gethostname) ret end |
.get_classes(json_fragment, what) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Gets the classes to run.
This method extracts and validates the ‘activity_classes’ and ‘workflow_classes’ fields from the runner specification file, or by autodiscovery of subclasses of [AWS::Flow::Activities]] and
- AWS::Flow::Workflows
-
in the object space.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/aws/runner.rb', line 92 def self.get_classes(json_fragment, what) classes = json_fragment[what[:config_key]] if classes.nil? || classes.empty? then # discover the classes classes = all_subclasses( what[:clazz] ) else # constantize the class names we just read from the config classes.map! { |c| Object.const_get(c) } end if classes.nil? || classes.empty? then raise ArgumentError.new "need at least one implementation class" end classes end |
.is_empty_field?(json_fragment, field_name) ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
143 144 145 146 |
# File 'lib/aws/runner.rb', line 143 def self.is_empty_field?(json_fragment, field_name) field = json_fragment[field_name] field.nil? || field.empty? end |
.load_config_json(path) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Loads the runner specification from a JSON file (passed in with the ‘–file` parameter when run from the shell).
301 302 303 304 |
# File 'lib/aws/runner.rb', line 301 def self.load_config_json(path) raise ArgumentError.new("Invalid file path: \"" + path.to_s + "\"") if not File.file? path config = JSON.parse(File.open(path) { |f| f.read }) end |
.load_files(config_path, json_config, what) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Runs the necessary “require” commands to load the code needed to run a module.
config_path: the path where the config file is, to be able to
resolve relative references
json_config: the content of the config
what: what should loaded. This is a hash expected to contain two keys:
- :default_file : the file to load unless a specific list is provided
- :config_key : the key of the config element which can contain a
specific list of files to load
164 165 166 167 168 169 170 171 |
# File 'lib/aws/runner.rb', line 164 def self.load_files(config_path, json_config, what) if is_empty_field?(json_config, what[:config_key]) then file = File.join(File.dirname(config_path), what[:default_file]) require file if File.exists? file else json_config[what[:config_key]].each { |file| require file if File.exists? file } end end |
.main ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Invoked from the shell.
332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/aws/runner.rb', line 332 def self.main = parse_command_line config_path = [:file] config = load_config_json( config_path ) add_dir_to_load_path( Pathname.new(config_path).dirname ) domain = setup_domain(config) workers = start_workers(domain, config_path, config) setup_signal_handling(workers) # Hang there until killed: this process is used to relay signals to # children to support and facilitate an orderly shutdown. wait_for_child_processes(workers) end |
.parse_command_line(argv = ARGV) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Interprets the command-line paramters pased in from the shell.
The parameter ‘–file` (short: `-f`) is required, and must provide the path to the runner configuration file.
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/aws/runner.rb', line 312 def self.parse_command_line(argv = ARGV) = {} optparse = OptionParser.new do |opts| opts.on('-f', '--file JSON_CONFIG_FILE', "Mandatory JSON config file") do |f| [:file] = f end end optparse.parse!(argv) # The `--file` parameter is not optional. raise OptionParser::MissingArgument.new("file") if [:file].nil? return end |
.set_process_name(name) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
72 73 74 |
# File 'lib/aws/runner.rb', line 72 def self.set_process_name(name) $0 = name end |
.setup_domain(json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers the domain if it is not already registered.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/aws/runner.rb', line 52 def self.setup_domain(json_config) swf = create_service_client(json_config) domain = json_config['domain'] # If retention period is not provided, default it to 7 days retention = domain['retention_in_days'] || FlowConstants::RETENTION_DEFAULT begin swf.client.register_domain({ name: domain['name'], workflow_execution_retention_period_in_days: retention.to_s }) rescue AWS::SimpleWorkflow::Errors::DomainAlreadyExistsFault => e # possible log an INFO/WARN if the domain already exists. end return AWS::SimpleWorkflow::Domain.new( domain['name'] ) end |
.setup_signal_handling(workers) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Sets up forwarding of signals to child processes to facilitate and support orderly shutdown.
270 271 272 |
# File 'lib/aws/runner.rb', line 270 def self.setup_signal_handling(workers) Signal.trap("INT") { workers.each { |w| Process.kill("INT", w) } } end |
.spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Spawns the workers.
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/aws/runner.rb', line 119 def self.spawn_and_start_workers(json_fragment, process_name, worker) workers = [] num_of_workers = json_fragment['number_of_workers'] || FlowConstants::NUM_OF_WORKERS_DEFAULT num_of_workers.times do workers << fork do set_process_name(process_name) worker.start() end end workers end |
.start_activity_workers(swf, domain = nil, config_path, json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts the activity workers.
The activities run by the workers consist of each class that extends
- AWS::Flow::Activities
-
in the paths provided in the ‘activity_paths`
section of [the runner specification file][], or that are loaded from ‘require` statements in the `workflows.rb` file.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/aws/runner.rb', line 181 def self.start_activity_workers(swf, domain = nil, config_path, json_config) workers = [] # load all classes for the activities load_files(config_path, json_config, {config_key: 'activity_paths', default_file: File.join('flow', 'activities.rb')}) domain = setup_domain(json_config) if domain.nil? # TODO: logger # start the workers for each spec json_config['activity_workers'].each do |w| # If number of forks is not provided, it will automatically default to 20 # within the ActivityWorker fork_count = w['number_of_forks_per_worker'] task_list = (w['task_list']) # create a worker worker = ActivityWorker.new(swf.client, domain, task_list, *w['activities']) {{ execution_workers: fork_count }} add_implementations(worker, w, {config_key: 'activity_classes', clazz: AWS::Flow::Activities}) # start as many workers as desired in child processes workers << spawn_and_start_workers(w, "activity-worker", worker) end return workers end |
.start_workers(domain = nil, config_path, json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts the workers and returns an array of process IDs (pids) for the worker processes.
254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/aws/runner.rb', line 254 def self.start_workers(domain = nil, config_path, json_config) workers = [] swf = create_service_client(json_config) workers << start_activity_workers(swf, domain, config_path, json_config) workers << start_workflow_workers(swf, domain, config_path, json_config) # needed to avoid returning nested arrays based on the calls above workers.flatten! end |
.start_workflow_workers(swf, domain = nil, config_path, json_config) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts the workflow workers.
The workflows run by the workers consist of each class that extends
- AWS::Flow::Workflows
-
in the paths provided in the ‘workflow_paths`
section of [the runner specification file][], or that are loaded from ‘require` statements in the `workflows.rb` file.
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/aws/runner.rb', line 216 def self.start_workflow_workers(swf, domain = nil, config_path, json_config) workers = [] # load all the classes for the workflows load_files(config_path, json_config, {config_key: 'workflow_paths', default_file: File.join('flow', 'workflows.rb')}) domain = setup_domain(json_config) if domain.nil? # TODO: logger # start the workers for each spec json_config['workflow_workers'].each do |w| task_list = (w['task_list']) # create a worker worker = WorkflowWorker.new(swf.client, domain, task_list, *w['workflows']) add_implementations(worker, w, {config_key: 'workflow_classes', clazz: AWS::Flow::Workflows}) # start as many workers as desired in child processes workers << spawn_and_start_workers(w, "workflow-worker", worker) end return workers end |
.wait_for_child_processes(workers) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Waits until all the child workers are finished.
TODO: use a logger
278 279 280 281 282 283 284 |
# File 'lib/aws/runner.rb', line 278 def self.wait_for_child_processes(workers) until workers.empty? puts "waiting on workers " + workers.to_s + " to complete" dead_guys = Process.waitall dead_guys.each { |pid, status| workers.delete(pid); puts pid.to_s + " exited" } end end |