Class: Hako::Schedulers::EcsServiceDiscovery

Inherits:
Object
  • Object
show all
Defined in:
lib/hako/schedulers/ecs_service_discovery.rb

Instance Method Summary collapse

Constructor Details

#initialize(config, region, dry_run:) ⇒ EcsServiceDiscovery

Returns a new instance of EcsServiceDiscovery.

Parameters:

  • config (Array<Hash>)
  • dry_run (Boolean)
  • region (String)


14
15
16
17
18
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 14

def initialize(config, region, dry_run:)
  @region = region
  @config = config
  @dry_run = dry_run
end

Instance Method Details

#applyvoid

This method returns an undefined value.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 21

def apply
  @config.map do |service_discovery|
    service = service_discovery.fetch('service')
    namespace_id = service.fetch('namespace_id')
    namespace = get_namespace(namespace_id)
    if !namespace
      raise Error.new("Service discovery namespace #{namespace_id} not found")
    elsif namespace.type != 'DNS_PRIVATE'
      raise Error.new("ECS only supports registering a service into a private DNS namespace: #{namespace.name} (#{namespace_id})")
    end

    service_name = service.fetch('name')
    current_service = find_service(namespace_id, service_name)
    if !current_service
      if @dry_run
        Hako.logger.info("Created service discovery service #{service_name} (dry-run)")
      else
        current_service = create_service(service)
        Hako.logger.info("Created service discovery service #{service_name} (#{current_service.id})")
      end
    else
      if service_changed?(service, current_service)
        if @dry_run
          Hako.logger.info("Updated service discovery service #{service_name} (#{current_service.id}) (dry-run)")
        else
          update_service(current_service.id, service)
          Hako.logger.info("Updated service discovery service #{service_name} (#{current_service.id})")
        end
      end
      warn_disallowed_service_change(service, current_service)
    end
  end
end

#create_service(service) ⇒ Aws::ServiceDiscovery::Types::Service (private)

Parameters:

  • service (Hash)

Returns:

  • (Aws::ServiceDiscovery::Types::Service)


148
149
150
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 148

def create_service(service)
  service_discovery_client.create_service(create_service_params(service)).service
end

#create_service_params(service) ⇒ Hash (private)

Parameters:

  • service (Hash)

Returns:

  • (Hash)


154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 154

def create_service_params(service)
  dns_config = service.fetch('dns_config')
  params = {
    name: service.fetch('name'),
    namespace_id: service['namespace_id'],
    description: service['description'],
    dns_config: {
      namespace_id: dns_config['namespace_id'],
      routing_policy: dns_config.fetch('routing_policy', 'MULTIVALUE'),
    },
  }
  params[:dns_config][:dns_records] = dns_config.fetch('dns_records').map do |dns_record|
    {
      type: dns_record.fetch('type'),
      ttl: dns_record.fetch('ttl'),
    }
  end
  if (health_check_custom_config = service['health_check_custom_config'])
    params[:health_check_custom_config] = {
      failure_threshold: health_check_custom_config['failure_threshold'],
    }
  end
  params
end

#find_service(namespace_id, service_name) ⇒ Aws::ServiceDiscovery::Types::ServiceSummary? (private)

Parameters:

  • namespace_id (String)
  • service_name (String)

Returns:

  • (Aws::ServiceDiscovery::Types::ServiceSummary, nil)


129
130
131
132
133
134
135
136
137
138
139
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 129

def find_service(namespace_id, service_name)
  params = {
    filters: [
      name: 'NAMESPACE_ID',
      values: [namespace_id],
      condition: 'EQ',
    ],
  }
  services = service_discovery_client.list_services(params).flat_map(&:services)
  services.find { |service| service.name == service_name }
end

#get_namespace(namespace_id) ⇒ Aws::ServiceDiscovery::Types::Namespace? (private)

Parameters:

  • namespace_id (String)

Returns:

  • (Aws::ServiceDiscovery::Types::Namespace, nil)


237
238
239
240
241
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 237

def get_namespace(namespace_id)
  service_discovery_client.get_namespace(id: namespace_id).namespace
rescue Aws::ServiceDiscovery::Errors::NamespaceNotFound
  nil
end

#get_service(service_id) ⇒ Aws::ServiceDiscovery::Types::Service? (private)

Parameters:

  • service_id (String)

Returns:

  • (Aws::ServiceDiscovery::Types::Service, nil)


229
230
231
232
233
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 229

def get_service(service_id)
  service_discovery_client.get_service(id: service_id).service
rescue Aws::ServiceDiscovery::Errors::ServiceNotFound
  nil
end

#remove(service_registries) ⇒ void

This method returns an undefined value.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 73

def remove(service_registries)
  service_registries.each do |service_registry|
    service_id = service_registry.registry_arn.slice(%r{service/(.+)\z}, 1)
    service = get_service(service_id)
    unless service
      Hako.logger.info("Service discovery service #{service_name} (#{service_id}) doesn't exist")
      next
    end
    if @dry_run
      Hako.logger.info("Deleted service discovery service #{service.name} (#{service.id}) (dry-run)")
    else
      deleted = false
      10.times do |i|
        sleep 10 unless i.zero?
        begin
          service_discovery_client.delete_service(id: service.id)
          deleted = true
          break
        rescue Aws::ServiceDiscovery::Errors::ResourceInUse => e
          Hako.logger.warn("#{e.class}: #{e.message}")
        end
      end
      unless deleted
        raise Error.new("Unable to delete service discovery service #{service.name} (#{service.id})")
      end

      Hako.logger.info("Deleted service discovery service #{service.name} (#{service.id})")
    end
  end
end

#service_changed?(expected_service, actual_service) ⇒ Boolean (private)

Parameters:

  • expected_service (Hash)
  • actual_service (Aws::ServiceDiscovery::Types::ServiceSummary)

Returns:

  • (Boolean)


182
183
184
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 182

def service_changed?(expected_service, actual_service)
  EcsServiceDiscoveryServiceComparator.new(update_service_params(expected_service)).different?(actual_service)
end

#service_discovery_clientAws::ServiceDiscovery::Client (private)

Returns:

  • (Aws::ServiceDiscovery::Client)


142
143
144
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 142

def service_discovery_client
  @service_discovery_client ||= Aws::ServiceDiscovery::Client.new(region: @region)
end

#service_registriesHash

Returns:

  • (Hash)


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 105

def service_registries
  @config.map do |service_discovery|
    service = service_discovery.fetch('service')
    namespace_id = service.fetch('namespace_id')
    service_name = service.fetch('name')
    current_service = find_service(namespace_id, service_name)
    unless current_service
      raise Error.new("Service discovery service #{service_name} not found")
    end

    {
      container_name: service_discovery['container_name'],
      container_port: service_discovery['container_port'],
      port: service_discovery['port'],
      registry_arn: current_service.arn,
    }.reject { |_, v| v.nil? }
  end
end

#status(service_registries) ⇒ void

This method returns an undefined value.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 56

def status(service_registries)
  service_registries.each do |service_registry|
    service_id = service_registry.registry_arn.slice(%r{service/(.+)\z}, 1)
    service = get_service(service_id)
    next unless service

    namespace = get_namespace(service.namespace_id)
    instances = service_discovery_client.list_instances(service_id: service.id).flat_map(&:instances)
    puts "  #{service.name}.#{namespace.name} instance_count=#{instances.size}"
    instances.each do |instance|
      instance_attributes = instance.attributes.map { |k, v| "#{k}=#{v}" }.join(', ')
      puts "    #{instance.id} #{instance_attributes}"
    end
  end
end

#update_service(service_id, service) ⇒ Object (private)

Parameters:

  • service_id (String)
  • service (Hash)


188
189
190
191
192
193
194
195
196
197
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 188

def update_service(service_id, service)
  operation_id = service_discovery_client.update_service(
    id: service_id,
    service: update_service_params(service),
  ).operation_id
  operation = wait_for_operation(operation_id)
  if operation.status != 'SUCCESS'
    raise Error.new("Unable to update service discovery service (#{operation.error_code}): #{operation.error_message}")
  end
end

#update_service_params(service) ⇒ Hash (private)

Parameters:

  • service (Hash)

Returns:

  • (Hash)


201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 201

def update_service_params(service)
  dns_config = service.fetch('dns_config')
  params = {
    description: service['description'],
    dns_config: {},
  }
  params[:dns_config][:dns_records] = dns_config.fetch('dns_records').map do |dns_record|
    {
      type: dns_record.fetch('type'),
      ttl: dns_record.fetch('ttl'),
    }
  end
  params
end

#wait_for_operation(operation_id) ⇒ Aws::ServiceDiscovery::Types::GetOperationResponse (private)

Parameters:

  • service_id (String)

Returns:

  • (Aws::ServiceDiscovery::Types::GetOperationResponse)


218
219
220
221
222
223
224
225
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 218

def wait_for_operation(operation_id)
  loop do
    operation = service_discovery_client.get_operation(operation_id: operation_id).operation
    return operation if %w[SUCCESS FAIL].include?(operation.status)

    sleep 10
  end
end

#warn_disallowed_service_change(expected_service, actual_service) ⇒ void (private)

This method returns an undefined value.

Parameters:

  • expected_service (Hash)
  • actual_service (Aws::ServiceDiscovery::Types::ServiceSummary)


246
247
248
249
250
251
252
253
254
# File 'lib/hako/schedulers/ecs_service_discovery.rb', line 246

def warn_disallowed_service_change(expected_service, actual_service)
  expected_service = create_service_params(expected_service)
  if expected_service.dig(:dns_config, :routing_policy) != actual_service.dns_config.routing_policy
    Hako.logger.warn("Ignoring updated service_discovery.dns_config.routing_policy in the configuration, because AWS doesn't allow updating it for now.")
  end
  if expected_service[:health_check_custom_config] != actual_service.health_check_custom_config&.to_h
    Hako.logger.warn("Ignoring updated service_discovery.health_check_custom_config in the configuration, because AWS doesn't allow updating it for now.")
  end
end