Class: EcsDeploy::AutoScaler::ClusterResourceManager

Inherits:
Object
  • Object
show all
Defined in:
lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb

Defined Under Namespace

Classes: DeregisterContainerInstanceFailed

Constant Summary collapse

MAX_DESCRIBABLE_SERVICE_COUNT =
10

Instance Method Summary collapse

Constructor Details

#initialize(region:, cluster:, service_configs:, logger: nil, capacity_based_on:) ⇒ ClusterResourceManager

Returns a new instance of ClusterResourceManager.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 12

def initialize(region:, cluster:, service_configs:, logger: nil, capacity_based_on:)
  @region = region
  @cluster = cluster
  @logger = logger
  @service_configs = service_configs
  @capacity_based_on = capacity_based_on
  if @capacity_based_on != "instances" && @capacity_based_on != "vCPUs"
    raise ArgumentError, 'capacity_based_on should be either "instances" or "vCPUs"'
  end

  @mutex = Mutex.new
  @resource = ConditionVariable.new
  @used_capacity = @service_configs.sum { |s| s.desired_count * s.required_capacity }
  @capacity = calculate_active_instance_capacity
end

Instance Method Details

#acquire(capacity, timeout: nil) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 28

def acquire(capacity, timeout: nil)
  @mutex.synchronize do
    @logger&.debug("#{log_prefix} Trying to acquire #{capacity} capacity (capacity: #{@capacity}, used_capacity: #{@used_capacity})")
    Timeout.timeout(timeout) do
      while @capacity - @used_capacity < capacity
        @resource.wait(@mutex)
      end
    end
    @used_capacity += capacity
    @logger&.debug("#{log_prefix} Acquired #{capacity} capacity (capacity: #{@capacity}, used_capacity: #{@used_capacity})")
  end
  true
rescue Timeout::Error
  false
end

#calculate_active_instance_capacityObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 116

def calculate_active_instance_capacity
  cl = ecs_client

  if @capacity_based_on == "instances"
    return cl.list_container_instances(cluster: @cluster, status: "ACTIVE").sum do |resp|
      resp.container_instance_arns.size
    end
  end

  total_cpu = cl.list_container_instances(cluster: @cluster, status: "ACTIVE").sum do |resp|
    next 0 if resp.container_instance_arns.empty?
    ecs_client.describe_container_instances(
      cluster: @cluster,
      container_instances: resp.container_instance_arns,
    ).container_instances.sum { |ci| ci.registered_resources.find { |r| r.name == "CPU" }.integer_value }
  end

  total_cpu / 1024
end

#deregister_container_instance(container_instance_arn) ⇒ Object



70
71
72
73
74
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 70

def deregister_container_instance(container_instance_arn)
  ecs_client.deregister_container_instance(cluster: @cluster, container_instance: container_instance_arn, force: true)
rescue Aws::ECS::Errors::InvalidParameterException
  raise DeregisterContainerInstanceFailed
end

#fetch_container_instance_arns_in_serviceObject



65
66
67
68
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 65

def fetch_container_instance_arns_in_service
  task_groups = @service_configs.map { |s| "service:#{s.name}" }
  ecs_client.list_container_instances(cluster: @cluster, filter: "task:group in [#{task_groups.join(",")}]").flat_map(&:container_instance_arns)
end

#fetch_container_instances_in_clusterObject



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 53

def fetch_container_instances_in_cluster
  cl = ecs_client
  resp = cl.list_container_instances(cluster: @cluster)
  if resp.container_instance_arns.empty?
    []
  else
    resp.flat_map do |resp|
      cl.describe_container_instances(cluster: @cluster, container_instances: resp.container_instance_arns).container_instances
    end
  end
end

#release(capacity) ⇒ Object



44
45
46
47
48
49
50
51
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 44

def release(capacity)
  @mutex.synchronize do
    @used_capacity -= capacity
    @resource.broadcast
  end
  @logger&.debug("#{log_prefix} Released #{capacity} capacity (capacity: #{@capacity}, used_capacity: #{@used_capacity})")
  true
end

#trigger_capacity_update(old_desired_capacity, new_desired_capacity, interval: 5, wait_until_capacity_updated: false) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 76

def trigger_capacity_update(old_desired_capacity, new_desired_capacity, interval: 5, wait_until_capacity_updated: false)
  return if new_desired_capacity == old_desired_capacity

  th = Thread.new do
    @logger&.info "#{log_prefix} Updating capacity: #{old_desired_capacity} -> #{new_desired_capacity}"
    Timeout.timeout(180) do
      until @capacity == new_desired_capacity ||
          (new_desired_capacity > old_desired_capacity && @capacity > new_desired_capacity) ||
          (new_desired_capacity < old_desired_capacity && @capacity < new_desired_capacity)
        @mutex.synchronize do
          @capacity = calculate_active_instance_capacity
          @resource.broadcast
        rescue => e
          AutoScaler.error_logger.warn("#{log_prefix} `#{__method__}': #{e} (#{e.class})")
        end

        sleep interval
      end
      @logger&.info "#{log_prefix} updated capacity to #{@capacity}"
    end
  rescue Timeout::Error => e
    msg = "#{log_prefix} `#{__method__}': #{e} (#{e.class})"
    if @capacity_based_on == "vCPUs"
      # Timeout::Error sometimes occur.
      # For example, the following case never meats the condition of until
      #   * old_desired_capaacity is 102
      #   * new_desired_capaacity is 101
      #   * all instances have 2 vCPUs
      AutoScaler.error_logger.warn(msg)
    else
      AutoScaler.error_logger.error(msg)
    end
  end

  if wait_until_capacity_updated
    @logger&.info "#{log_prefix} Waiting for the number of active instances to reach #{new_desired_capacity} (from #{old_desired_capacity})"
    th.join
  end
end