Module: AWS::Flow::Runner

Includes:
AWS::Flow
Defined in:
lib/aws/runner.rb

Class Method Summary collapse

Methods included from AWS::Flow

decision_context, version, with_retry, #workflow_client, workflow_client, #workflow_factory

Class Method Details

.add_dir_to_load_path(path) ⇒ Object

this is used to extend the load path so that the ‘require’ of workflow and activity implementation files can succeed before adding the implementation classes to the workers

Raises:

  • (ArgumentError)


251
252
253
254
# File 'lib/aws/runner.rb', line 251

def self.add_dir_to_load_path(path)
  raise ArgumentError.new("Invalid directory path: \"" + path.to_s + "\"") if not FileTest.directory? path
  $LOAD_PATH.unshift path.to_s
end

.add_implementations(worker, json_fragment, what) ⇒ Object

used to add implementations to workers; see get_classes



103
104
105
106
# File 'lib/aws/runner.rb', line 103

def self.add_implementations(worker, json_fragment, what)
  classes = get_classes(json_fragment, what)
  classes.each { |c| worker.add_implementation(c) }
end

.all_subclasses(clazz) ⇒ Object

searches the object space for all subclasses of clazz



80
81
82
# File 'lib/aws/runner.rb', line 80

def self.all_subclasses(clazz)
  ObjectSpace.each_object(Class).select { |klass| klass.is_a? clazz }
end

.create_service_client(json_config) ⇒ Object



205
206
207
208
209
210
211
212
# File 'lib/aws/runner.rb', line 205

def self.create_service_client(json_config)
  # set the UserAgent prefix for all clients
  if json_config['user_agent_prefix'] then
    AWS.config(user_agent_prefix: json_config['user_agent_prefix'])
  end
  
  swf = AWS::SimpleWorkflow.new
end

.expand_task_list(value) ⇒ Object

used to support host-specific task lists when the string “|hostname|” is found in the task list it is replaced by the host name

Raises:

  • (ArgumentError)


123
124
125
126
127
128
# File 'lib/aws/runner.rb', line 123

def self.expand_task_list(value)
  raise ArgumentError.new unless value
  ret = value
  ret.gsub!("|hostname|", Socket.gethostname)
  ret
end

.get_classes(json_fragment, what) ⇒ Object

used to extract and validate the ‘activity_classes’ and ‘workflow_classes’ fields from the config, or autodiscover subclasses in the ObjectSpace



87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/aws/runner.rb', line 87

def self.get_classes(json_fragment, what)
  classes = json_fragment[what[:config_key]]
  if classes.nil? || classes.empty? then
    # discover the classes
    classes = all_subclasses( what[:clazz] )
  else
    # constantize the class names we just read from the config
    classes.map! { |c| Object.const_get(c) }
  end
  if classes.nil? || classes.empty? then
    raise ArgumentError.new "need at least one implementation class"
  end
  classes
end

.is_empty_field?(json_fragment, field_name) ⇒ Boolean

Returns:

  • (Boolean)


130
131
132
133
# File 'lib/aws/runner.rb', line 130

def self.is_empty_field?(json_fragment, field_name)
  field = json_fragment[field_name]
  field.nil? || field.empty?
end

.load_config_json(path) ⇒ Object

loads the configuration from a JSON file

Raises:

  • (ArgumentError)


259
260
261
262
# File 'lib/aws/runner.rb', line 259

def self.load_config_json(path)
  raise ArgumentError.new("Invalid file path: \"" + path.to_s + "\"") if not File.file? path
  config = JSON.parse(File.open(path) { |f| f.read })
end

.load_files(config_path, json_config, what) ⇒ Object

This is used to issue the necessary “require” commands to load the code needed to run a module

config_path: the path where the config file is, to be able to

resolve relative references

json_config: the content of the config what: what should loaded. This is a hash expected to contain two keys:

- :default_file : the file to load unless a specific list is provided
- :config_key : the key of the config element which can contain a
        specific list of files to load


145
146
147
148
149
150
151
152
# File 'lib/aws/runner.rb', line 145

def self.load_files(config_path, json_config, what)
  if is_empty_field?(json_config, what[:config_key]) then 
    file = File.join(File.dirname(config_path), what[:default_file])
    require file if File.exists? file
  else
    json_config[what[:config_key]].each { |file| require file if File.exists? file }
  end
end

.mainObject



281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/aws/runner.rb', line 281

def self.main
  options = parse_command_line
  config_path =  options[:file]
  config = load_config_json( config_path )
  add_dir_to_load_path( Pathname.new(config_path).dirname )
  domain = setup_domain(config)
  workers = start_workers(domain, config_path, config)
  setup_signal_handling(workers)

  # hang there until killed: this process is used to relay signals to children
  # to support and facilitate an orderly shutdown
  wait_for_child_processes(workers)

end

.parse_command_line(argv = ARGV) ⇒ Object

Raises:

  • (OptionParser::MissingArgument)


265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/aws/runner.rb', line 265

def self.parse_command_line(argv = ARGV)
  options = {}
  optparse = OptionParser.new do |opts|
    opts.on('-f', '--file JSON_CONFIG_FILE', "Mandatory JSON config file") do |f|
      options[:file] = f
    end
  end

  optparse.parse!(argv)

  # file parameter is not optional
  raise OptionParser::MissingArgument.new("file") if options[:file].nil?

  return options
end

.set_process_name(name) ⇒ Object



75
76
77
# File 'lib/aws/runner.rb', line 75

def self.set_process_name(name)
  $0 = name
end

.setup_domain(json_config) ⇒ Object

registers the domain if it is not



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/aws/runner.rb', line 56

def self.setup_domain(json_config)

  swf = create_service_client(json_config)

  domain = json_config['domain']
  # If retention period is not provided, default it to 7 days
  retention = domain['retention_in_days'] || FlowConstants::RETENTION_DEFAULT

  begin
    swf.client.register_domain({
      name: domain['name'],
      workflow_execution_retention_period_in_days: retention.to_s
    })
  rescue AWS::SimpleWorkflow::Errors::DomainAlreadyExistsFault => e
    # possible log an INFO/WARN if the domain already exists.
  end
  return AWS::SimpleWorkflow::Domain.new( domain['name'] )
end

.setup_signal_handling(workers) ⇒ Object

setup forwarding of signals to child processes, to facilitate and support orderly shutdown



234
235
236
# File 'lib/aws/runner.rb', line 234

def self.setup_signal_handling(workers)
  Signal.trap("INT") { workers.each { |w| Process.kill("INT", w) }  }
end

.spawn_and_start_workers(json_fragment, process_name, worker) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/aws/runner.rb', line 108

def self.spawn_and_start_workers(json_fragment, process_name, worker)
  workers = []
  num_of_workers = json_fragment['number_of_workers'] || FlowConstants::NUM_OF_WORKERS_DEFAULT
  num_of_workers.times do
    workers << fork do
      set_process_name(process_name)
      worker.start()
    end
  end
  workers
end

.start_activity_workers(swf, domain = nil, config_path, json_config) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/aws/runner.rb', line 154

def self.start_activity_workers(swf, domain = nil, config_path, json_config)
  workers = []
  # load all classes for the activities
  load_files(config_path, json_config, {config_key: 'activity_paths',
               default_file: File.join('flow', 'activities.rb')})
  domain = setup_domain(json_config) if domain.nil?

  # TODO: logger
  # start the workers for each spec
  json_config['activity_workers'].each do |w|
    # If number of forks is not provided, it will automatically default to 20
    # within the ActivityWorker
    fork_count = w['number_of_forks_per_worker']
    task_list = expand_task_list(w['task_list'])

    # create a worker
    worker = ActivityWorker.new(swf.client, domain, task_list, *w['activities']) {{ max_workers: fork_count }}
    add_implementations(worker, w, {config_key: 'activity_classes',
               clazz: AWS::Flow::Activities})

    # start as many workers as desired in child processes
    workers << spawn_and_start_workers(w, "activity-worker", worker)
  end

  return workers
end

.start_workers(domain = nil, config_path, json_config) ⇒ Object

this will start all the workers and return an array of pids for the worker processes



218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/aws/runner.rb', line 218

def self.start_workers(domain = nil, config_path, json_config)
  
  workers = []
  
  swf = create_service_client(json_config)

  workers << start_activity_workers(swf, domain, config_path, json_config)
  workers << start_workflow_workers(swf, domain, config_path, json_config)

  # needed to avoid returning nested arrays based on the calls above
  workers.flatten!

end

.start_workflow_workers(swf, domain = nil, config_path, json_config) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/aws/runner.rb', line 181

def self.start_workflow_workers(swf, domain = nil, config_path, json_config)
  workers = []
  # load all the classes for the workflows
  load_files(config_path, json_config, {config_key: 'workflow_paths',
               default_file: File.join('flow', 'workflows.rb')})
  domain = setup_domain(json_config) if domain.nil?

  # TODO: logger
  # start the workers for each spec
  json_config['workflow_workers'].each do |w|
    task_list = expand_task_list(w['task_list'])

    # create a worker
    worker = WorkflowWorker.new(swf.client, domain, task_list, *w['workflows'])
    add_implementations(worker, w, {config_key: 'workflow_classes',
               clazz: AWS::Flow::Workflows})

    # start as many workers as desired in child processes
    workers << spawn_and_start_workers(w, "workflow-worker", worker)
  end

  return workers
end

.wait_for_child_processes(workers) ⇒ Object

TODO: use a logger this will wait until all the child workers have died



240
241
242
243
244
245
246
# File 'lib/aws/runner.rb', line 240

def self.wait_for_child_processes(workers)
  until workers.empty?
    puts "waiting on workers " + workers.to_s + " to complete"
    dead_guys = Process.waitall
    dead_guys.each { |pid, status| workers.delete(pid); puts pid.to_s + " exited" }
  end
end