Class: Hako::Schedulers::Ecs

Inherits:
Hako::Scheduler show all
Defined in:
lib/hako/schedulers/ecs.rb

Defined Under Namespace

Classes: NoTasksStarted

Constant Summary collapse

DEFAULT_CLUSTER =
'default'
DEFAULT_FRONT_PORT =
10000
TASK_ID_RE =
/\(task ([\h-]+)\)\.\z/

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Hako::Scheduler

#initialize, #validation_error!

Constructor Details

This class inherits a constructor from Hako::Scheduler

Instance Attribute Details

#taskObject (readonly)

Returns the value of attribute task.



22
23
24
# File 'lib/hako/schedulers/ecs.rb', line 22

def task
  @task
end

Instance Method Details

#call_rollback_started(task_definition, target_definition) ⇒ nil (private)

Parameters:

  • task_definition (Aws::ECS::Types::TaskDefinition)
  • target_definition (String)

Returns:

  • (nil)


894
895
896
897
898
899
900
901
902
903
# File 'lib/hako/schedulers/ecs.rb', line 894

def call_rollback_started(task_definition, target_definition)
  current_app = task_definition.container_definitions.find { |c| c.name == 'app' }
  target_app = ecs_client.describe_task_definition(task_definition: target_definition).task_definition.container_definitions.find { |c| c.name == 'app' }
  if current_app && target_app
    @scripts.each { |script| script.rollback_started(current_app.image, target_app.image) }
  else
    Hako.logger.warn("Cannot find image_tag. current_app=#{current_app.inspect} target_app=#{target_app.inspect}. Skip calling Script#rollback_started")
  end
  nil
end

#configure(options) ⇒ Object

Parameters:

  • options (Hash<String, Object>)


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/hako/schedulers/ecs.rb', line 25

def configure(options)
  @cluster = options.fetch('cluster', DEFAULT_CLUSTER)
  @desired_count = options.fetch('desired_count', nil)
  @region = options.fetch('region') { validation_error!('region must be set') }
  @role = options.fetch('role', nil)
  @task_role_arn = options.fetch('task_role_arn', nil)
  @ecs_elb_options = options.fetch('elb', nil)
  @ecs_elb_v2_options = options.fetch('elb_v2', nil)
  if @ecs_elb_options && @ecs_elb_v2_options
    validation_error!('Cannot specify both elb and elb_v2')
  end
  @dynamic_port_mapping = options.fetch('dynamic_port_mapping', @ecs_elb_options.nil?)
  if options.key?('autoscaling')
    @autoscaling = EcsAutoscaling.new(options.fetch('autoscaling'), dry_run: @dry_run)
  end
  @autoscaling_group_for_oneshot = options.fetch('autoscaling_group_for_oneshot', nil)
  @deployment_configuration = {}
  %i[maximum_percent minimum_healthy_percent].each do |key|
    @deployment_configuration[key] = options.dig('deployment_configuration', key.to_s)
  end
  @placement_constraints = options.fetch('placement_constraints', [])
  @placement_strategy = options.fetch('placement_strategy', [])

  @started_at = nil
  @container_instance_arn = nil
end

#create_definition(name, container) ⇒ Hash (private)

Parameters:

Returns:

  • (Hash)


461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# File 'lib/hako/schedulers/ecs.rb', line 461

def create_definition(name, container)
  environment = container.env.map { |k, v| { name: k, value: v } }
  {
    name: name,
    image: container.image_tag,
    cpu: container.cpu,
    memory: container.memory,
    memory_reservation: container.memory_reservation,
    links: container.links,
    port_mappings: container.port_mappings,
    essential: true,
    environment: environment,
    docker_labels: container.docker_labels,
    mount_points: container.mount_points,
    command: container.command,
    privileged: container.privileged,
    volumes_from: container.volumes_from,
    user: container.user,
    log_configuration: container.log_configuration,
  }
end

#create_definitions(containers) ⇒ nil (private)

Parameters:

Returns:

  • (nil)


426
427
428
429
430
# File 'lib/hako/schedulers/ecs.rb', line 426

def create_definitions(containers)
  containers.map do |name, container|
    create_definition(name, container)
  end
end

#create_initial_service(task_definition_arn, front_port) ⇒ Aws::ECS::Types::Service (private)

Parameters:

  • task_definition_arn (String)
  • front_port (Fixnum)

Returns:

  • (Aws::ECS::Types::Service)


614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
# File 'lib/hako/schedulers/ecs.rb', line 614

def create_initial_service(task_definition_arn, front_port)
  params = {
    cluster: @cluster,
    service_name: @app_id,
    task_definition: task_definition_arn,
    desired_count: 0,
    role: @role,
    deployment_configuration: @deployment_configuration,
    placement_constraints: @placement_constraints,
    placement_strategy: @placement_strategy,
  }
  if ecs_elb_client.find_or_create_load_balancer(front_port)
    ecs_elb_client.modify_attributes
    params[:load_balancers] = [
      @ecs_elb_client.load_balancer_params_for_service.merge(container_name: 'front', container_port: 80),
    ]
  end
  ecs_client.create_service(params).service
end

#deploy(containers) ⇒ nil

Parameters:

Returns:

  • (nil)


54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/hako/schedulers/ecs.rb', line 54

def deploy(containers)
  unless @desired_count
    validation_error!('desired_count must be set')
  end
  front_port = determine_front_port
  @scripts.each { |script| script.deploy_started(containers, front_port) }
  definitions = create_definitions(containers)

  if @dry_run
    definitions.each do |d|
      print_definition_in_cli_format(d)
    end
    if @autoscaling
      @autoscaling.apply(Aws::ECS::Types::Service.new(cluster_arn: @cluster, service_name: @app_id))
    end
  else
    current_service = describe_service
    task_definition = register_task_definition(definitions)
    task_definition_changed = task_definition != :noop
    if task_definition_changed
      Hako.logger.info "Registered task definition: #{task_definition.task_definition_arn}"
    else
      Hako.logger.info "Task definition isn't changed"
      task_definition = ecs_client.describe_task_definition(task_definition: @app_id).task_definition
    end
    unless current_service
      current_service = create_initial_service(task_definition.task_definition_arn, front_port)
    end
    service = update_service(current_service, task_definition.task_definition_arn)
    if service == :noop
      Hako.logger.info "Service isn't changed"
      if @autoscaling
        @autoscaling.apply(current_service)
      end
    else
      Hako.logger.info "Updated service: #{service.service_arn}"
      if @autoscaling
        @autoscaling.apply(service)
      end
      unless wait_for_ready(service)
        if task_definition_changed
          Hako.logger.error("Rolling back to #{current_service.task_definition}")
          update_service(service, current_service.task_definition)
          ecs_client.deregister_task_definition(task_definition: service.task_definition)
          Hako.logger.debug "Deregistered #{service.task_definition}"
        end
        raise Error.new('Deployment cancelled')
      end
    end
    Hako.logger.info 'Deployment completed'
  end
end

#describe_serviceAws::ECS::Types::Service? (private)

Returns:

  • (Aws::ECS::Types::Service, nil)


297
298
299
300
301
302
# File 'lib/hako/schedulers/ecs.rb', line 297

def describe_service
  service = ecs_client.describe_services(cluster: @cluster, services: [@app_id]).services[0]
  if service && service.status != 'INACTIVE'
    service
  end
end

#determine_front_portFixnum (private)

Returns:

  • (Fixnum)


305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/hako/schedulers/ecs.rb', line 305

def determine_front_port
  if @dynamic_port_mapping
    return 0
  end
  if @dry_run
    return DEFAULT_FRONT_PORT
  end
  service = describe_service
  if service
    find_front_port(service)
  else
    new_front_port
  end
end

#different_definition?(expected_container, actual_container) ⇒ Boolean (private)

Parameters:

  • expected_container (Hash)
  • actual_container (Aws::ECS::Types::ContainerDefinition)

Returns:

  • (Boolean)


405
406
407
# File 'lib/hako/schedulers/ecs.rb', line 405

def different_definition?(expected_container, actual_container)
  EcsDefinitionComparator.new(expected_container).different?(actual_container)
end

#different_volumes?(actual_volumes) ⇒ Boolean (private)

Parameters:

  • actual_volumes (Hash<String, Hash<String, String>>)

Returns:

  • (Boolean)


385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/hako/schedulers/ecs.rb', line 385

def different_volumes?(actual_volumes)
  if @volumes.size != actual_volumes.size
    return true
  end
  actual_volumes.each do |actual_volume|
    expected_volume = @volumes[actual_volume.name]
    if expected_volume.nil?
      return true
    end
    if expected_volume['source_path'] != actual_volume.host.source_path
      return true
    end
  end

  false
end

#ec2_clientAws::EC2::Client (private)

Returns:

  • (Aws::EC2::Client)


282
283
284
# File 'lib/hako/schedulers/ecs.rb', line 282

def ec2_client
  @ec2_client ||= Aws::EC2::Client.new(region: @region)
end

#ecs_clientAws::ECS::Client (private)

Returns:

  • (Aws::ECS::Client)


277
278
279
# File 'lib/hako/schedulers/ecs.rb', line 277

def ecs_client
  @ecs_client ||= Aws::ECS::Client.new(region: @region)
end

#ecs_elb_clientEcsElb, EcsElbV2 (private)

Returns:



287
288
289
290
291
292
293
294
# File 'lib/hako/schedulers/ecs.rb', line 287

def ecs_elb_client
  @ecs_elb_client ||=
    if @ecs_elb_options
      EcsElb.new(@app_id, @region, @ecs_elb_options, dry_run: @dry_run)
    else
      EcsElbV2.new(@app_id, @region, @ecs_elb_v2_options, dry_run: @dry_run)
    end
end

#extract_task_id(message) ⇒ String? (private)

Parameters:

  • message (String)

Returns:

  • (String, nil)


710
711
712
# File 'lib/hako/schedulers/ecs.rb', line 710

def extract_task_id(message)
  message.slice(TASK_ID_RE, 1)
end

#find_front_port(service) ⇒ Fixnum? (private)

Parameters:

  • service (Aws::ECS::Types::Service)

Returns:

  • (Fixnum, nil)


344
345
346
347
348
349
350
351
352
353
# File 'lib/hako/schedulers/ecs.rb', line 344

def find_front_port(service)
  task_definition = ecs_client.describe_task_definition(task_definition: service.task_definition).task_definition
  container_definitions = {}
  task_definition.container_definitions.each do |c|
    container_definitions[c.name] = c
  end
  if container_definitions['front']
    container_definitions['front'].port_mappings[0].host_port
  end
end

#find_latest_event_id(events) ⇒ String? (private)

Parameters:

  • events (Array<Aws::ECS::Types::ServiceEvent>)

Returns:

  • (String, nil)


699
700
701
702
703
704
705
# File 'lib/hako/schedulers/ecs.rb', line 699

def find_latest_event_id(events)
  if events.empty?
    nil
  else
    events[0].id
  end
end

#find_rollback_target(task_definition) ⇒ String (private)

Parameters:

  • (Aws::ECS::Types::TaskDefinition)

Returns:

  • (String)


726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
# File 'lib/hako/schedulers/ecs.rb', line 726

def find_rollback_target(task_definition)
  if task_definition.status != 'ACTIVE'
    raise 'Cannot find rollback target from INACTIVE task_definition!'
  end

  arn_found = false
  ecs_client.list_task_definitions(family_prefix: task_definition.family, status: 'ACTIVE', sort: 'DESC').each do |page|
    page.task_definition_arns.each do |arn|
      if arn_found
        return arn
      elsif arn == task_definition.task_definition_arn
        arn_found = true
      end
    end
  end

  raise "Unable to find rollback target. #{task_definition.task_definition_arn} is INACTIVE?"
end

#has_capacity?(task_definition, container_instances) ⇒ Boolean (private)

Parameters:

  • task_definition (Aws::ECS::Types::TaskDefinition)
  • container_instances (Array<Aws::ECS::Types::ContainerInstance>)

Returns:

  • (Boolean)


794
795
796
797
798
799
800
801
# File 'lib/hako/schedulers/ecs.rb', line 794

def has_capacity?(task_definition, container_instances)
  required_cpu, required_memory = task_definition.container_definitions.inject([0, 0]) { |(cpu, memory), d| [cpu + d.cpu, memory + d.memory] }
  container_instances.any? do |ci|
    cpu = ci.remaining_resources.find { |r| r.name == 'CPU' }.integer_value
    memory = ci.remaining_resources.find { |r| r.name == 'MEMORY' }.integer_value
    required_cpu <= cpu && required_memory <= memory
  end
end

#new_front_portFixnum (private)

Returns:

  • (Fixnum)


321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/hako/schedulers/ecs.rb', line 321

def new_front_port
  max_port = -1
  ecs_client.list_services(cluster: @cluster).each do |page|
    unless page.service_arns.empty?
      ecs_client.describe_services(cluster: @cluster, services: page.service_arns).services.each do |s|
        if s.status != 'INACTIVE'
          port = find_front_port(s)
          if port
            max_port = [max_port, port].max
          end
        end
      end
    end
  end
  if max_port == -1
    DEFAULT_FRONT_PORT
  else
    max_port + 1
  end
end

#on_no_tasks_started(task_definition) ⇒ Boolean (private)

Returns true if the capacity is reserved.

Parameters:

  • task_definition (Aws::ECS::Types::TaskDefinition)

Returns:

  • (Boolean)

    true if the capacity is reserved



747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
# File 'lib/hako/schedulers/ecs.rb', line 747

def on_no_tasks_started(task_definition)
  unless @autoscaling_group_for_oneshot
    return false
  end

  autoscaling = Aws::AutoScaling::Client.new
  loop do
    asg = autoscaling.describe_auto_scaling_groups(auto_scaling_group_names: [@autoscaling_group_for_oneshot]).auto_scaling_groups[0]
    unless asg
      raise Error.new("AutoScaling Group '#{@autoscaling_group_for_oneshot}' does not exist")
    end

    container_instances = ecs_client.list_container_instances(cluster: @cluster).flat_map { |c| ecs_client.describe_container_instances(cluster: @cluster, container_instances: c.container_instance_arns).container_instances }
    if has_capacity?(task_definition, container_instances)
      Hako.logger.info("There's remaining capacity. Start retrying...")
      return true
    end

    # Check autoscaling group health
    current = asg.instances.count { |i| i.lifecycle_state == 'InService' }
    if asg.desired_capacity != current
      Hako.logger.debug("#{asg.auto_scaling_group_name} isn't in desired state. desired_capacity=#{asg.desired_capacity} in-service instances=#{current}")
      sleep 1
      next
    end

    # Check out-of-service instances
    out_instances = asg.instances.map(&:instance_id)
    container_instances.each do |ci|
      out_instances.delete(ci.ec2_instance_id)
    end
    unless out_instances.empty?
      Hako.logger.debug("There's instances that is running but not registered as container instances: #{out_instances}")
      sleep 1
      next
    end

    # Scale out
    desired = current + 1
    Hako.logger.info("Increment desired_capacity of #{asg.auto_scaling_group_name} from #{current} to #{desired}")
    autoscaling.set_desired_capacity(auto_scaling_group_name: asg.auto_scaling_group_name, desired_capacity: desired)
  end
end

#oneshot(containers, commands, env) ⇒ nil

Parameters:

  • containers (Hash<String, Container>)
  • commands (Array<String>)
  • env (Hash<String, String>)

Returns:

  • (nil)


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/hako/schedulers/ecs.rb', line 138

def oneshot(containers, commands, env)
  definitions = create_definitions(containers)
  definitions.each do |definition|
    definition.delete(:essential)
  end

  if @dry_run
    definitions.each do |d|
      if d[:name] == 'app'
        d[:command] = commands
      end
      print_definition_in_cli_format(d, additional_env: env)
    end
    0
  else
    task_definition = register_task_definition_for_oneshot(definitions)
    if task_definition == :noop
      Hako.logger.info "Task definition isn't changed"
      task_definition = ecs_client.describe_task_definition(task_definition: "#{@app_id}-oneshot").task_definition
    else
      Hako.logger.info "Registered task definition: #{task_definition.task_definition_arn}"
    end
    @task = run_task(task_definition, commands, env)
    Hako.logger.info "Started task: #{@task.task_arn}"
    @scripts.each { |script| script.oneshot_started(self) }
    wait_for_oneshot_finish
  end
end

Parameters:

  • definition (Hash)
  • additional_env (Hash<String, String>) (defaults to: {})

Returns:

  • (nil)


806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
# File 'lib/hako/schedulers/ecs.rb', line 806

def print_definition_in_cli_format(definition, additional_env: {})
  cmd = %w[docker run]
  cmd << '--name' << definition.fetch(:name)
  cmd << '--cpu-shares' << definition.fetch(:cpu)
  cmd << '--memory' << definition.fetch(:memory)
  definition.fetch(:links).each do |link|
    cmd << '--link' << link
  end
  definition.fetch(:port_mappings).each do |port_mapping|
    cmd << '--publish' << "#{port_mapping.fetch('host_port')}:#{port_mapping.fetch('container_port')}"
  end
  definition.fetch(:docker_labels).each do |key, val|
    if key != 'cc.wanko.hako.version'
      cmd << '--label' << "#{key}=#{val}"
    end
  end
  definition.fetch(:mount_points).each do |mount_point|
    source_volume = mount_point.fetch(:source_volume)
    v = @volumes[source_volume]
    if v
      cmd << '--volume' << "#{v.fetch('source_path')}:#{mount_point.fetch(:container_path)}#{mount_point[:read_only] ? ':ro' : ''}"
    else
      raise "Could not find volume #{source_volume}"
    end
  end
  if definition[:privileged]
    cmd << '--privileged'
  end
  definition.fetch(:volumes_from).each do |volumes_from|
    p volumes_from
  end
  if definition[:user]
    cmd << '--user' << definition[:user]
  end

  cmd << "\\\n  "
  definition.fetch(:environment).each do |env|
    name = env.fetch(:name)
    value = env.fetch(:value)
    # additional_env (given in command line) has priority over env (declared in YAML)
    unless additional_env.key?(name)
      cmd << '--env' << "#{name}=#{value}"
      cmd << "\\\n  "
    end
  end
  additional_env.each do |name, value|
    cmd << '--env' << "#{name}=#{value}"
    cmd << "\\\n  "
  end

  cmd << definition.fetch(:image)
  if definition[:command]
    cmd << "\\\n  "
    cmd += definition[:command]
  end
  puts cmd.join(' ')
  nil
end

#register_task_definition(definitions) ⇒ Aws::ECS::Types::TaskDefinition, Symbol (private)

Parameters:

  • definitions (Array<Hash>)

Returns:

  • (Aws::ECS::Types::TaskDefinition, Symbol)


411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/hako/schedulers/ecs.rb', line 411

def register_task_definition(definitions)
  if task_definition_changed?(@app_id, definitions)
    ecs_client.register_task_definition(
      family: @app_id,
      task_role_arn: @task_role_arn,
      container_definitions: definitions,
      volumes: volumes_definition,
    ).task_definition
  else
    :noop
  end
end

#register_task_definition_for_oneshot(definitions) ⇒ Aws::ECS::Types::TaskDefinition, Symbol (private)

Parameters:

  • definitions (Array<Hash>)

Returns:

  • (Aws::ECS::Types::TaskDefinition, Symbol)


434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/hako/schedulers/ecs.rb', line 434

def register_task_definition_for_oneshot(definitions)
  family = "#{@app_id}-oneshot"
  if task_definition_changed?(family, definitions)
    ecs_client.register_task_definition(
      family: "#{@app_id}-oneshot",
      task_role_arn: @task_role_arn,
      container_definitions: definitions,
      volumes: volumes_definition,
    ).task_definition
  else
    :noop
  end
end

#removenil

Returns:

  • (nil)


238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/hako/schedulers/ecs.rb', line 238

def remove
  service = describe_service
  if service
    if @dry_run
      Hako.logger.info "ecs_client.update_service(cluster: #{service.cluster_arn}, service: #{service.service_arn}, desired_count: 0)"
      Hako.logger.info "ecs_client.delete_service(cluster: #{service.cluster_arn}, service: #{service.service_arn})"
    else
      ecs_client.update_service(cluster: service.cluster_arn, service: service.service_arn, desired_count: 0)
      ecs_client.delete_service(cluster: service.cluster_arn, service: service.service_arn)
      Hako.logger.info "#{service.service_arn} is deleted"
    end
    if @autoscaling
      @autoscaling.remove(service)
    end
  else
    puts "Service #{@app_id} doesn't exist"
  end

  ecs_elb_client.destroy
end

#report_container_instance(container_instance_arn) ⇒ nil (private)

Parameters:

  • container_instance_arn (String)

Returns:

  • (nil)


583
584
585
586
# File 'lib/hako/schedulers/ecs.rb', line 583

def report_container_instance(container_instance_arn)
  container_instance = ecs_client.describe_container_instances(cluster: @cluster, container_instances: [container_instance_arn]).container_instances[0]
  Hako.logger.info "Container instance is #{container_instance_arn} (#{container_instance.ec2_instance_id})"
end

#report_task_diagnostics(task) ⇒ nil (private)

Parameters:

  • task (Aws::ECS::Types::Task)

Returns:

  • (nil)


716
717
718
719
720
721
722
# File 'lib/hako/schedulers/ecs.rb', line 716

def report_task_diagnostics(task)
  Hako.logger.error("task_definition_arn=#{task.task_definition_arn} last_status=#{task.last_status}")
  Hako.logger.error("  stopped_reason: #{task.stopped_reason}")
  task.containers.sort_by(&:name).each do |container|
    Hako.logger.error("    Container #{container.name}: last_status=#{container.last_status} exit_code=#{container.exit_code.inspect} reason=#{container.reason.inspect}")
  end
end

#rollbackObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/hako/schedulers/ecs.rb', line 107

def rollback
  current_service = describe_service
  unless current_service
    Hako.logger.error 'Unable to find service'
    exit 1
  end

  task_definition = ecs_client.describe_task_definition(task_definition: current_service.task_definition).task_definition
  current_definition = "#{task_definition.family}:#{task_definition.revision}"
  target_definition = find_rollback_target(task_definition)
  Hako.logger.info "Current task defintion is #{current_definition}. Rolling back to #{target_definition}"
  call_rollback_started(task_definition, target_definition)

  if @dry_run
    Hako.logger.info 'Deployment completed (dry-run)'
  else
    service = ecs_client.update_service(cluster: current_service.cluster_arn, service: current_service.service_arn, task_definition: target_definition).service
    Hako.logger.info "Updated service: #{service.service_arn}"

    deregistered_definition = ecs_client.deregister_task_definition(task_definition: current_definition).task_definition
    Hako.logger.debug "Deregistered #{deregistered_definition.task_definition_arn}"

    wait_for_ready(service)
    Hako.logger.info 'Deployment completed'
  end
end

#run_task(task_definition, commands, env) ⇒ Aws::ECS::Types::Task (private)

Parameters:

  • task_definition (Aws::ECS::Types::TaskDefinition)
  • commands (Array<String>)
  • env (Hash<String, String>)

Returns:

  • (Aws::ECS::Types::Task)


487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
# File 'lib/hako/schedulers/ecs.rb', line 487

def run_task(task_definition, commands, env)
  environment = env.map { |k, v| { name: k, value: v } }
  result = ecs_client.run_task(
    cluster: @cluster,
    task_definition: task_definition.task_definition_arn,
    overrides: {
      container_overrides: [
        {
          name: 'app',
          command: commands,
          environment: environment,
        },
      ],
    },
    count: 1,
    started_by: 'hako oneshot',
  )
  result.failures.each do |failure|
    Hako.logger.error("#{failure.arn} #{failure.reason}")
  end
  if result.tasks.empty?
    raise NoTasksStarted.new('No tasks started')
  end
  result.tasks[0]
rescue Aws::ECS::Errors::InvalidParameterException => e
  if e.message == 'No Container Instances were found in your cluster.' && on_no_tasks_started(task_definition)
    retry
  else
    raise e
  end
rescue NoTasksStarted => e
  if on_no_tasks_started(task_definition)
    retry
  else
    raise e
  end
end

#service_changed?(service, params) ⇒ Boolean (private)

Parameters:

  • service (Aws::ECS::Types::Service)
  • params (Hash)

Returns:

  • (Boolean)


637
638
639
# File 'lib/hako/schedulers/ecs.rb', line 637

def service_changed?(service, params)
  EcsServiceComparator.new(params).different?(service)
end

#statusnil

Returns:

  • (nil)


177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/hako/schedulers/ecs.rb', line 177

def status
  service = describe_service
  unless service
    puts 'Unavailable'
    exit 1
  end

  unless service.load_balancers.empty?
    puts 'Load balancer:'
    ecs_elb_client.show_status(service.load_balancers[0])
  end

  puts 'Deployments:'
  service.deployments.each do |d|
    abbrev_task_definition = d.task_definition.slice(%r{task-definition/(.+)\z}, 1)
    puts "  [#{d.status}] #{abbrev_task_definition} desired_count=#{d.desired_count}, pending_count=#{d.pending_count}, running_count=#{d.running_count}"
  end

  puts 'Tasks:'
  ecs_client.list_tasks(cluster: @cluster, service_name: service.service_arn).each do |page|
    unless page.task_arns.empty?
      tasks = ecs_client.describe_tasks(cluster: @cluster, tasks: page.task_arns).tasks
      container_instances = {}
      ecs_client.describe_container_instances(cluster: @cluster, container_instances: tasks.map(&:container_instance_arn)).container_instances.each do |ci|
        container_instances[ci.container_instance_arn] = ci
      end
      ec2_instances = {}
      ec2_client.describe_instances(instance_ids: container_instances.values.map(&:ec2_instance_id)).reservations.each do |r|
        r.instances.each do |i|
          ec2_instances[i.instance_id] = i
        end
      end
      tasks.each do |task|
        ci = container_instances[task.container_instance_arn]
        instance = ec2_instances[ci.ec2_instance_id]
        print "  [#{task.last_status}]: #{ci.ec2_instance_id}"
        if instance
          name_tag = instance.tags.find { |t| t.key == 'Name' }
          if name_tag
            print " (#{name_tag.value})"
          end
        end
        puts
      end
    end
  end

  puts 'Events:'
  service.events.first(10).each do |e|
    puts "  #{e.created_at}: #{e.message}"
  end

  if @autoscaling
    puts 'Autoscaling:'
    @autoscaling.status(service)
  else
    puts 'Autoscaling: No'
  end
end

#stopnil

Returns:

  • (nil)


260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/hako/schedulers/ecs.rb', line 260

def stop
  service = describe_service
  if service
    if @dry_run
      Hako.logger.info("ecs_client.update_service(cluster: #{service.cluster_arn}, service: #{service.service_arn}, desired_count: 0)")
    else
      ecs_client.update_service(cluster: service.cluster_arn, service: service.service_arn, desired_count: 0)
      Hako.logger.info("#{service.service_arn} is stopped")
    end
  else
    puts "Service #{@app_id} doesn't exist"
  end
end

#stop_oneshotnil

Returns:

  • (nil)


168
169
170
171
172
173
174
# File 'lib/hako/schedulers/ecs.rb', line 168

def stop_oneshot
  if @task
    Hako.logger.warn "Stopping #{@task.task_arn}"
    ecs_client.stop_task(cluster: @cluster, task: @task.task_arn, reason: 'Stopped by hako stop_oneshot')
    wait_for_oneshot_finish
  end
end

#task_definition_changed?(family, definitions) ⇒ Boolean (private)

Parameters:

  • family (String)
  • definitions (Array<Hash>)

Returns:

  • (Boolean)


358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/hako/schedulers/ecs.rb', line 358

def task_definition_changed?(family, definitions)
  if @force
    return true
  end
  task_definition = ecs_client.describe_task_definition(task_definition: family).task_definition
  container_definitions = {}
  task_definition.container_definitions.each do |c|
    container_definitions[c.name] = c
  end

  if task_definition.task_role_arn != @task_role_arn
    return true
  end
  if different_volumes?(task_definition.volumes)
    return true
  end
  if definitions.any? { |definition| different_definition?(definition, container_definitions.delete(definition[:name])) }
    return true
  end
  !container_definitions.empty?
rescue Aws::ECS::Errors::ClientException
  # Task definition does not exist
  true
end

#update_service(current_service, task_definition_arn) ⇒ Aws::ECS::Types::Service, Symbol (private)

Parameters:

  • task_definition_arn (Aws::ECS::Types::Service)
  • task_definition_arn (String)

Returns:

  • (Aws::ECS::Types::Service, Symbol)


591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
# File 'lib/hako/schedulers/ecs.rb', line 591

def update_service(current_service, task_definition_arn)
  params = {
    cluster: @cluster,
    service: @app_id,
    desired_count: @desired_count,
    task_definition: task_definition_arn,
    deployment_configuration: @deployment_configuration,
  }
  if @autoscaling
    # Keep current desired_count if autoscaling is enabled
    params[:desired_count] = current_service.desired_count
  end
  warn_placement_policy_change(current_service)
  if service_changed?(current_service, params)
    ecs_client.update_service(params).service
  else
    :noop
  end
end

#volumes_definitionHash (private)

Returns:

  • (Hash)


449
450
451
452
453
454
455
456
# File 'lib/hako/schedulers/ecs.rb', line 449

def volumes_definition
  @volumes.map do |name, volume|
    {
      name: name,
      host: { source_path: volume['source_path'] },
    }
  end
end

#wait_for_oneshot_finishFixnum (private)

Returns:

  • (Fixnum)


526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# File 'lib/hako/schedulers/ecs.rb', line 526

def wait_for_oneshot_finish
  containers = wait_for_task(@task)
  @task = nil
  Hako.logger.info 'Oneshot task finished'
  exit_code = 127
  containers.each do |name, container|
    if container.exit_code.nil?
      Hako.logger.info "#{name} has stopped without exit_code: reason=#{container.reason}"
    else
      Hako.logger.info "#{name} has stopped with exit_code=#{container.exit_code}"
      if name == 'app'
        exit_code = container.exit_code
      end
    end
  end
  exit_code
end

#wait_for_ready(service) ⇒ Boolean (private)

Parameters:

  • service (Aws::ECS::Types::Service)

Returns:

  • (Boolean)


643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
# File 'lib/hako/schedulers/ecs.rb', line 643

def wait_for_ready(service)
  latest_event_id = find_latest_event_id(service.events)
  Hako.logger.debug "  latest_event_id=#{latest_event_id}"
  started_at =
    if @timeout
      Process.clock_gettime(Process::CLOCK_MONOTONIC)
    end

  started_task_ids = []

  loop do
    if started_at
      if Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at > @timeout
        Hako.logger.error('Timed out')
        return false
      end
    end

    s = ecs_client.describe_services(cluster: service.cluster_arn, services: [service.service_arn]).services[0]
    if s.nil?
      Hako.logger.debug "Service #{service.service_arn} could not be described"
      sleep 1
      next
    end
    s.events.each do |e|
      if e.id == latest_event_id
        break
      end
      Hako.logger.info "#{e.created_at}: #{e.message}"
      task_id = extract_task_id(e.message)
      if task_id && e.message.include?(' has started ')
        started_task_ids << task_id
      end
    end
    latest_event_id = find_latest_event_id(s.events)
    Hako.logger.debug "  latest_event_id=#{latest_event_id}, deployments=#{s.deployments}"
    no_active = s.deployments.all? { |d| d.status != 'ACTIVE' }
    primary = s.deployments.find { |d| d.status == 'PRIMARY' }
    if primary.desired_count < started_task_ids.size
      Hako.logger.error('Some started tasks are stopped. It seems new deployment is failing to start')
      ecs_client.describe_tasks(cluster: service.cluster_arn, tasks: started_task_ids).tasks.each do |task|
        report_task_diagnostics(task)
      end
      return false
    end
    primary_ready = primary && primary.running_count == primary.desired_count
    if no_active && primary_ready
      return true
    else
      sleep 1
    end
  end
end

#wait_for_task(task) ⇒ nil (private)

Parameters:

  • task (Aws::ECS::Types::Task)

Returns:

  • (nil)


546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
# File 'lib/hako/schedulers/ecs.rb', line 546

def wait_for_task(task)
  task_arn = task.task_arn
  loop do
    task = ecs_client.describe_tasks(cluster: @cluster, tasks: [task_arn]).tasks[0]
    if task.nil?
      Hako.logger.debug "Task #{task_arn} could not be described"
      sleep 1
      next
    end

    if @container_instance_arn != task.container_instance_arn
      @container_instance_arn = task.container_instance_arn
      report_container_instance(@container_instance_arn)
    end
    unless @started_at
      @started_at = task.started_at
      if @started_at
        Hako.logger.info "Started at #{@started_at}"
      end
    end

    Hako.logger.debug "  status #{task.last_status}"

    if task.last_status == 'STOPPED'
      Hako.logger.info "Stopped at #{task.stopped_at} (reason: #{task.stopped_reason})"
      containers = {}
      task.containers.each do |c|
        containers[c.name] = c
      end
      return containers
    end
    sleep 1
  end
end

#warn_placement_policy_change(service) ⇒ nil (private)

Parameters:

  • service (Aws::ECS::Types::Service)

Returns:

  • (nil)


867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
# File 'lib/hako/schedulers/ecs.rb', line 867

def warn_placement_policy_change(service)
  placement_constraints = service.placement_constraints.map do |c|
    h = { 'type' => c.type }
    unless c.expression.nil?
      h['expression'] = c.expression
    end
    h
  end
  if @placement_constraints != placement_constraints
    Hako.logger.warn "Ignoring updated placement_constraints in the configuration, because AWS doesn't allow updating them for now."
  end

  placement_strategy = service.placement_strategy.map do |s|
    h = { 'type' => s.type }
    unless s.field.nil?
      h['field'] = s.field.downcase
    end
    h
  end
  if @placement_strategy != placement_strategy
    Hako.logger.warn "Ignoring updated placement_strategy in the configuration, because AWS doesn't allow updating them for now."
  end
end