Class: Simplerubysteps::Tool

Inherits:
Object
  • Object
show all
Defined in:
lib/simplerubysteps/tool.rb

Instance Method Summary collapse

Constructor Details

#initializeTool

Returns a new instance of Tool.



15
16
17
18
19
20
# File 'lib/simplerubysteps/tool.rb', line 15

def initialize
  @cloudformation_client = Aws::CloudFormation::Client.new
  @s3_client = Aws::S3::Client.new
  @states_client = Aws::States::Client.new
  @logs_client = Aws::CloudWatchLogs::Client.new
end

Instance Method Details

#cloudformation_templateObject



171
172
173
174
175
# File 'lib/simplerubysteps/tool.rb', line 171

def cloudformation_template
  File.open("#{File.dirname(__FILE__)}/statemachine.yaml", "r") do |file|
    return file.read
  end
end

#create_zip(zip_file, files_by_name) ⇒ Object



141
142
143
144
145
146
147
148
# File 'lib/simplerubysteps/tool.rb', line 141

def create_zip(zip_file, files_by_name)
  Zip::File.open(zip_file, create: true) do |zipfile|
    base_dir = File.expand_path(File.dirname(__FILE__))
    files_by_name.each do |n, f|
      zipfile.add n, f
    end
  end
end

#deployObject



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/simplerubysteps/tool.rb', line 200

def deploy
  current_stack_outputs = stack_outputs(stack_name_from_current_dir)

  unless current_stack_outputs
    current_stack_outputs = stack_create(stack_name_from_current_dir, cloudformation_template, {
      "DeployLambda" => "no",
      "DeployStepfunctions" => "no",
      "LambdaS3" => "",
      "StepFunctionsS3" => "",
      "StateMachineType" => "",
    })

    puts "Deployment bucket created"
  end

  deploy_bucket = current_stack_outputs["DeployBucket"]

  puts "Deployment bucket: #{deploy_bucket}"

  function_zip_temp = Tempfile.new("function")
  create_zip function_zip_temp.path, my_lib_files.merge(workflow_files)
  lambda_sha = Digest::SHA1.file function_zip_temp.path
  lambda_zip_name = "function-#{lambda_sha}.zip"
  upload_file_to_s3 deploy_bucket, lambda_zip_name, function_zip_temp.path

  puts "Uploaded: #{lambda_zip_name}"

  unless current_stack_outputs["LambdaFunctionARN"]
    current_stack_outputs = stack_update(stack_name_from_current_dir, cloudformation_template, {
      "DeployLambda" => "yes",
      "DeployStepfunctions" => "no",
      "LambdaS3" => lambda_zip_name,
      "StepFunctionsS3" => "",
      "StateMachineType" => "",
    })

    puts "Lambda function created"
  end

  lambda_arn = current_stack_outputs["LambdaFunctionARN"]

  puts "Lambda function: #{lambda_arn}"

  workflow_type = `ruby -e 'require "./workflow.rb";puts $sm.kind'`.strip

  state_machine_json = JSON.parse(`LAMBDA_FUNCTION_ARN=#{lambda_arn} ruby -e 'require "./workflow.rb";puts $sm.render.to_json'`).to_json
  state_machine_json_sha = Digest::SHA1.hexdigest state_machine_json
  state_machine_json_name = "statemachine-#{state_machine_json_sha}.json"
  upload_to_s3 deploy_bucket, state_machine_json_name, state_machine_json

  puts "Uploaded: #{state_machine_json_name}"

  current_stack_outputs = stack_update(stack_name_from_current_dir, cloudformation_template, {
    "DeployLambda" => "yes",
    "DeployStepfunctions" => "yes",
    "LambdaS3" => lambda_zip_name,
    "StepFunctionsS3" => state_machine_json_name,
    "StateMachineType" => workflow_type,
  })

  if current_stack_outputs[:no_update]
    puts "Stack not updated"
  else
    puts "Stack updated"
  end

  puts "State machine: #{current_stack_outputs["StepFunctionsStateMachineARN"]}"
end

#describe_execution(execution_arn) ⇒ Object



283
284
285
286
287
# File 'lib/simplerubysteps/tool.rb', line 283

def describe_execution(execution_arn)
  @states_client.describe_execution(
    execution_arn: execution_arn,
  )
end

#destroyObject



185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/simplerubysteps/tool.rb', line 185

def destroy
  current_stack_outputs = stack_outputs(stack_name_from_current_dir)
  deploy_bucket = current_stack_outputs["DeployBucket"]
  rause "No CloudFormation stack to destroy" unless deploy_bucket

  empty_s3_bucket deploy_bucket

  puts "Bucket emptied: #{deploy_bucket}"

  @cloudformation_client.delete_stack(stack_name: stack_name_from_current_dir)
  @cloudformation_client.wait_until(:stack_delete_complete, stack_name: stack_name_from_current_dir)

  puts "Stack deleted"
end

#dir_files(base_dir, glob) ⇒ Object



150
151
152
153
154
155
156
157
# File 'lib/simplerubysteps/tool.rb', line 150

def dir_files(base_dir, glob)
  files_by_name = {}
  base_dir = File.expand_path(base_dir)
  Dir.glob("#{base_dir}/#{glob}").select { |path| File.file?(path) }.each do |f|
    files_by_name[File.expand_path(f)[base_dir.length + 1..-1]] = f
  end
  files_by_name
end

#empty_s3_bucket(bucket_name) ⇒ Object



135
136
137
138
139
# File 'lib/simplerubysteps/tool.rb', line 135

def empty_s3_bucket(bucket_name)
  @s3_client.list_objects_v2(bucket: bucket_name).contents.each do |object|
    @s3_client.delete_object(bucket: bucket_name, key: object.key)
  end
end

#log(extract_pattern = nil) ⇒ Object



177
178
179
180
181
182
183
# File 'lib/simplerubysteps/tool.rb', line 177

def log(extract_pattern = nil)
  current_stack_outputs = stack_outputs(stack_name_from_current_dir)
  raise "State Machine is not deployed" unless current_stack_outputs
  function_name = current_stack_outputs["LambdaFunctionName"]

  tail_follow_logs "/aws/lambda/#{function_name}", extract_pattern
end

#my_lib_filesObject



167
168
169
# File 'lib/simplerubysteps/tool.rb', line 167

def my_lib_files
  files = dir_files(File.dirname(__FILE__) + "/..", "**/*.rb").filter { |f| not(f =~ /tool.rb/) }
end

#runObject



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/simplerubysteps/tool.rb', line 339

def run
  options = {
    :wait => false,
    :input => $stdin,
  }

  subcommands = {
    "deploy" => OptionParser.new do |opts|
      opts.banner = "Usage: #{$0} deploy [options]"

      opts.on("-h", "--help", "Display this help message") do
        puts opts
        exit
      end
    end,
    "destroy" => OptionParser.new do |opts|
      opts.banner = "Usage: #{$0} destroy [options]"

      opts.on("-h", "--help", "Display this help message") do
        puts opts
        exit
      end
    end,
    "log" => OptionParser.new do |opts|
      opts.banner = "Usage: #{$0} log [options]"

      opts.on("--extract_pattern VALUE", "Wait for and extract pattern") do |value|
        options[:extract_pattern] = value
      end

      opts.on("-h", "--help", "Display this help message") do
        puts opts
        exit
      end
    end,
    "start" => OptionParser.new do |opts|
      opts.banner = "Usage: #{$0} start [options]"

      opts.on("--wait", "Wait for STANDARD state machine to complete") do
        options[:wait] = true
      end

      opts.on("--input VALUE", "/path/to/file (STDIN will be used per default)") do |value|
        options[:input] = File.new(value)
      end

      opts.on("-h", "--help", "Display this help message") do
        puts opts
        exit
      end
    end,
    "task-success" => OptionParser.new do |opts|
      opts.banner = "Usage: #{$0} task-success [options]"

      opts.on("--input VALUE", "/path/to/file (STDIN will be used per default)") do |value|
        options[:input] = File.new(value)
      end

      opts.on("--token VALUE", "The task token") do |value|
        options[:token] = value
      end

      opts.on("-h", "--help", "Display this help message") do
        puts opts
        exit
      end
    end,
  }

  global = OptionParser.new do |opts|
    opts.banner = "Usage: #{$0} [command] [options]"
    opts.separator ""
    opts.separator "Commands (#{Simplerubysteps::VERSION}):"
    opts.separator "    deploy        Create Step Functions State Machine"
    opts.separator "    destroy       Delete Step Functions State Machine"
    opts.separator "    log           Continuously prints Lambda function log output"
    opts.separator "    start         Start State Machine execution"
    opts.separator "    task-success  Continue Start State Machine execution"
    opts.separator ""

    opts.on_tail("-h", "--help", "Display this help message") do
      puts opts
      exit
    end
  end

  begin
    global.order!(ARGV)
    command = ARGV.shift
    options[:command] = command
    subcommands.fetch(command).parse!(ARGV)
  rescue KeyError
    puts "Unknown command: '#{command}'"
    puts
    puts global
    exit 1
  rescue OptionParser::ParseError => error
    puts error.message
    puts subcommands.fetch(command)
    exit 1
  end

  if options[:command] == "deploy"
    deploy
  elsif options[:command] == "start"
    start options[:wait], options[:input]
  elsif options[:command] == "log"
    log options[:extract_pattern]
  elsif options[:command] == "task-success"
    send_task_success options[:token], options[:input]
  elsif options[:command] == "destroy"
    destroy
  end
end

#send_task_success(task_token, output = $stdin) ⇒ Object



328
329
330
331
332
333
334
335
336
337
# File 'lib/simplerubysteps/tool.rb', line 328

def send_task_success(task_token, output = $stdin)
  raise "No token" unless task_token

  output_json = JSON.parse(output.read).to_json

  puts @states_client.send_task_success(
    task_token: task_token,
    output: output_json,
  ).to_json
end

#stack_create(stack_name, template, parameters) ⇒ Object



104
105
106
107
108
# File 'lib/simplerubysteps/tool.rb', line 104

def stack_create(stack_name, template, parameters)
  @cloudformation_client.create_stack(stack_params(stack_name, template, parameters))
  @cloudformation_client.wait_until(:stack_create_complete, stack_name: stack_name)
  stack_outputs(stack_name)
end

#stack_name_from_current_dirObject



159
160
161
# File 'lib/simplerubysteps/tool.rb', line 159

def stack_name_from_current_dir
  File.basename(File.expand_path("."))
end

#stack_outputs(stack_name) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/simplerubysteps/tool.rb', line 74

def stack_outputs(stack_name)
  begin
    response = @cloudformation_client.describe_stacks(stack_name: stack_name)
    outputs = {}
    response.stacks.first.outputs.each do |output|
      outputs[output.output_key] = output.output_value
    end
    outputs
  rescue Aws::CloudFormation::Errors::ServiceError => error
    return nil if error.message =~ /Stack .* does not exist/
    raise error
  end
end

#stack_params(stack_name, template, parameters) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/simplerubysteps/tool.rb', line 88

def stack_params(stack_name, template, parameters)
  params = {
    stack_name: stack_name,
    template_body: template,
    capabilities: ["CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],
    parameters: [],
  }
  parameters.each do |k, v|
    params[:parameters].push({
      parameter_key: k,
      parameter_value: v,
    })
  end
  params
end

#stack_update(stack_name, template, parameters) ⇒ Object



110
111
112
113
114
115
116
117
118
119
# File 'lib/simplerubysteps/tool.rb', line 110

def stack_update(stack_name, template, parameters)
  begin
    @cloudformation_client.update_stack(stack_params(stack_name, template, parameters))
    @cloudformation_client.wait_until(:stack_update_complete, stack_name: stack_name)
    stack_outputs(stack_name)
  rescue Aws::CloudFormation::Errors::ServiceError => error
    return stack_outputs(stack_name).merge({ :no_update => true }) if error.message =~ /No updates are to be performed/
    raise unless error.message =~ /No updates are to be performed/
  end
end

#start(wait = true, input = $stdin) ⇒ Object



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/simplerubysteps/tool.rb', line 304

def start(wait = true, input = $stdin)
  current_stack_outputs = stack_outputs(stack_name_from_current_dir)
  raise "State Machine is not deployed" unless current_stack_outputs
  state_machine_arn = current_stack_outputs["StepFunctionsStateMachineARN"]

  input_json = JSON.parse(input.read).to_json

  if current_stack_outputs["StateMachineType"] == "STANDARD"
    start_response = start_async_execution(state_machine_arn, input_json)

    unless wait
      puts start_response.to_json
    else
      execution_arn = start_response.execution_arn

      puts wait_for_async_execution_completion(execution_arn).to_json
    end
  elsif current_stack_outputs["StateMachineType"] == "EXPRESS"
    puts start_sync_execution(state_machine_arn, input_json).to_json
  else
    raise "Unknown state machine type: #{current_stack_outputs["StateMachineType"]}"
  end
end

#start_async_execution(state_machine_arn, input) ⇒ Object



276
277
278
279
280
281
# File 'lib/simplerubysteps/tool.rb', line 276

def start_async_execution(state_machine_arn, input)
  @states_client.start_execution(
    state_machine_arn: state_machine_arn,
    input: input,
  )
end

#start_sync_execution(state_machine_arn, input) ⇒ Object



269
270
271
272
273
274
# File 'lib/simplerubysteps/tool.rb', line 269

def start_sync_execution(state_machine_arn, input)
  @states_client.start_sync_execution(
    state_machine_arn: state_machine_arn,
    input: input,
  )
end

#tail_follow_logs(log_group_name, extract_pattern = nil) ⇒ Object

FIXME too hacky and not really working



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/simplerubysteps/tool.rb', line 22

def tail_follow_logs(log_group_name, extract_pattern = nil) # FIXME too hacky and not really working
  Signal.trap("INT") do
    exit
  end

  first_event_time = Time.now.to_i * 1000

  next_tokens = {}
  first_round = true
  loop do
    log_streams = @logs_client.describe_log_streams(
      log_group_name: log_group_name,
      order_by: "LastEventTime",
      descending: true,
    ).log_streams

    log_streams.each do |log_stream|
      get_log_events_params = {
        log_group_name: log_group_name,
        log_stream_name: log_stream.log_stream_name,
      }

      if next_tokens.key?(log_stream.log_stream_name)
        get_log_events_params[:next_token] = next_tokens[log_stream.log_stream_name]
      else
        get_log_events_params[:start_time] = first_round ? log_stream.last_event_timestamp : first_event_time
      end

      response = @logs_client.get_log_events(get_log_events_params)

      response.events.each do |event|
        if event.timestamp >= first_event_time
          if extract_pattern
            if /#{extract_pattern}/ =~ event.message
              puts $1
              exit
            end
          else
            puts "#{Time.at(event.timestamp / 1000).utc} - #{log_stream.log_stream_name} - #{event.message}"
          end
        end
      end

      next_tokens[log_stream.log_stream_name] = response.next_forward_token
    end

    sleep 5

    first_round = false
  end
end

#upload_file_to_s3(bucket, key, file_path) ⇒ Object



129
130
131
132
133
# File 'lib/simplerubysteps/tool.rb', line 129

def upload_file_to_s3(bucket, key, file_path)
  File.open(file_path, "rb") do |file|
    upload_to_s3(bucket, key, file)
  end
end

#upload_to_s3(bucket, key, body) ⇒ Object



121
122
123
124
125
126
127
# File 'lib/simplerubysteps/tool.rb', line 121

def upload_to_s3(bucket, key, body)
  @s3_client.put_object(
    bucket: bucket,
    key: key,
    body: body,
  )
end

#wait_for_async_execution_completion(execution_arn) ⇒ Object



289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/simplerubysteps/tool.rb', line 289

def wait_for_async_execution_completion(execution_arn)
  response = nil

  loop do
    response = describe_execution(execution_arn)
    status = response.status

    break if %w[SUCCEEDED FAILED TIMED_OUT].include?(status)

    sleep 5
  end

  response
end

#workflow_filesObject



163
164
165
# File 'lib/simplerubysteps/tool.rb', line 163

def workflow_files
  dir_files ".", "**/*.rb"
end