Class: EcsDeploy::AutoScaler::ClusterResourceManager
- Inherits:
-
Object
- Object
- EcsDeploy::AutoScaler::ClusterResourceManager
show all
- Defined in:
- lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb
Defined Under Namespace
Classes: DeregisterContainerInstanceFailed
Constant Summary
collapse
- MAX_DESCRIBABLE_SERVICE_COUNT =
10
Instance Method Summary
collapse
Constructor Details
#initialize(region:, cluster:, service_configs:, logger: nil, capacity_based_on:) ⇒ ClusterResourceManager
Returns a new instance of ClusterResourceManager.
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 12
def initialize(region:, cluster:, service_configs:, logger: nil, capacity_based_on:)
@region = region
@cluster = cluster
@logger = logger
@service_configs = service_configs
@capacity_based_on = capacity_based_on
if @capacity_based_on != "instances" && @capacity_based_on != "vCPUs"
raise ArgumentError, 'capacity_based_on should be either "instances" or "vCPUs"'
end
@mutex = Mutex.new
@resource = ConditionVariable.new
@used_capacity = @service_configs.sum { |s| s.desired_count * s.required_capacity }
@capacity = calculate_active_instance_capacity
end
|
Instance Method Details
#acquire(capacity, timeout: nil) ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 28
def acquire(capacity, timeout: nil)
@mutex.synchronize do
@logger&.debug("#{log_prefix} Trying to acquire #{capacity} capacity (capacity: #{@capacity}, used_capacity: #{@used_capacity})")
Timeout.timeout(timeout) do
while @capacity - @used_capacity < capacity
@resource.wait(@mutex)
end
end
@used_capacity += capacity
@logger&.debug("#{log_prefix} Acquired #{capacity} capacity (capacity: #{@capacity}, used_capacity: #{@used_capacity})")
end
true
rescue Timeout::Error
false
end
|
#calculate_active_instance_capacity ⇒ Object
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 116
def calculate_active_instance_capacity
cl = ecs_client
if @capacity_based_on == "instances"
return cl.list_container_instances(cluster: @cluster, status: "ACTIVE").sum do |resp|
resp.container_instance_arns.size
end
end
total_cpu = cl.list_container_instances(cluster: @cluster, status: "ACTIVE").sum do |resp|
next 0 if resp.container_instance_arns.empty?
ecs_client.describe_container_instances(
cluster: @cluster,
container_instances: resp.container_instance_arns,
).container_instances.sum { |ci| ci.registered_resources.find { |r| r.name == "CPU" }.integer_value }
end
total_cpu / 1024
end
|
#deregister_container_instance(container_instance_arn) ⇒ Object
70
71
72
73
74
|
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 70
def deregister_container_instance(container_instance_arn)
ecs_client.deregister_container_instance(cluster: @cluster, container_instance: container_instance_arn, force: true)
rescue Aws::ECS::Errors::InvalidParameterException
raise DeregisterContainerInstanceFailed
end
|
#fetch_container_instance_arns_in_service ⇒ Object
65
66
67
68
|
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 65
def fetch_container_instance_arns_in_service
task_groups = @service_configs.map { |s| "service:#{s.name}" }
ecs_client.list_container_instances(cluster: @cluster, filter: "task:group in [#{task_groups.join(",")}]").flat_map(&:container_instance_arns)
end
|
#fetch_container_instances_in_cluster ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 53
def fetch_container_instances_in_cluster
cl = ecs_client
resp = cl.list_container_instances(cluster: @cluster)
if resp.container_instance_arns.empty?
[]
else
resp.flat_map do |resp|
cl.describe_container_instances(cluster: @cluster, container_instances: resp.container_instance_arns).container_instances
end
end
end
|
#release(capacity) ⇒ Object
44
45
46
47
48
49
50
51
|
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 44
def release(capacity)
@mutex.synchronize do
@used_capacity -= capacity
@resource.broadcast
end
@logger&.debug("#{log_prefix} Released #{capacity} capacity (capacity: #{@capacity}, used_capacity: #{@used_capacity})")
true
end
|
#trigger_capacity_update(old_desired_capacity, new_desired_capacity, interval: 5, wait_until_capacity_updated: false) ⇒ Object
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/ecs_deploy/auto_scaler/cluster_resource_manager.rb', line 76
def trigger_capacity_update(old_desired_capacity, new_desired_capacity, interval: 5, wait_until_capacity_updated: false)
return if new_desired_capacity == old_desired_capacity
th = Thread.new do
@logger&.info "#{log_prefix} Updating capacity: #{old_desired_capacity} -> #{new_desired_capacity}"
Timeout.timeout(180) do
until @capacity == new_desired_capacity ||
(new_desired_capacity > old_desired_capacity && @capacity > new_desired_capacity) ||
(new_desired_capacity < old_desired_capacity && @capacity < new_desired_capacity)
@mutex.synchronize do
@capacity = calculate_active_instance_capacity
@resource.broadcast
rescue => e
AutoScaler.error_logger.warn("#{log_prefix} `#{__method__}': #{e} (#{e.class})")
end
sleep interval
end
@logger&.info "#{log_prefix} updated capacity to #{@capacity}"
end
rescue Timeout::Error => e
msg = "#{log_prefix} `#{__method__}': #{e} (#{e.class})"
if @capacity_based_on == "vCPUs"
AutoScaler.error_logger.warn(msg)
else
AutoScaler.error_logger.error(msg)
end
end
if wait_until_capacity_updated
@logger&.info "#{log_prefix} Waiting for the number of active instances to reach #{new_desired_capacity} (from #{old_desired_capacity})"
th.join
end
end
|