Class: Wrapbox::Runner::Ecs

Inherits:
Object
  • Object
show all
Defined in:
lib/wrapbox/runner/ecs.rb,
lib/wrapbox/runner/ecs/task_waiter.rb,
lib/wrapbox/runner/ecs/instance_manager.rb

Defined Under Namespace

Classes: Cli, ContainerAbnormalEnd, ExecutionFailure, ExecutionTimeout, InstanceManager, LackResource, LaunchFailure, Parameter, TaskWaiter

Constant Summary collapse

EXECUTION_RETRY_INTERVAL =
3
WAIT_DELAY =
5
TERM_TIMEOUT =
120
HOST_TERMINATED_REASON_REGEXP =
/Host EC2.*terminated/

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Ecs

Returns a new instance of Ecs.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/wrapbox/runner/ecs.rb', line 63

def initialize(options)
  @name = options[:name]
  @task_definition_name = options[:task_definition_name]
  @revision = options[:revision]
  @cluster = options[:cluster]
  @region = options[:region]
  @volumes = options[:volumes]
  @placement_constraints = options[:placement_constraints] || []
  @placement_strategy = options[:placement_strategy]
  @capacity_provider_strategy = options[:capacity_provider_strategy] || []
  @launch_type = options[:launch_type]
  @requires_compatibilities = options[:requires_compatibilities]
  @network_mode = options[:network_mode]
  @network_configuration = options[:network_configuration]
  @cpu = options[:cpu]
  @memory = options[:memory]
  @enable_ecs_managed_tags = options[:enable_ecs_managed_tags]
  @tags = options[:tags]
  @propagate_tags = options[:propagate_tags]
  @enable_execute_command = options[:enable_execute_command]
  if options[:launch_instances]
    @instance_manager = Wrapbox::Runner::Ecs::InstanceManager.new(@cluster, @region, **options[:launch_instances])
  end
  @task_waiter = Wrapbox::Runner::Ecs::TaskWaiter.new(cluster: @cluster, region: @region, delay: WAIT_DELAY)

  @container_definitions = options[:container_definition] ? [options[:container_definition]] : options[:container_definitions] || []
  @container_definitions.concat(options[:additional_container_definitions]) if options[:additional_container_definitions] # deprecated

  if !@container_definitions.empty? && options[:task_definition]
    raise "Please set only one of `container_definition` and `task_definition`"
  end

  if options[:additional_container_definitions] && !options[:additional_container_definitions].empty?
    warn "`additional_container_definitions` is deprecated parameter, Use `container_definitions` instead of it"
  end

  @task_definition_info = options[:task_definition]

  if !@container_definitions.empty?
    @task_definition_name ||= "wrapbox_#{@name}"
    @main_container_name = @container_definitions[0][:name] || @task_definition_name
  elsif @task_definition_info
    @task_definition_name = @task_definition_info[:task_definition_name]
    @main_container_name = @task_definition_info[:main_container_name]
    unless @main_container_name
      raise "Please set `task_definition[:main_container_name]`"
    end
  end

  @container_definitions.each do |d|
    d[:docker_labels]&.stringify_keys!
    d.dig(:log_configuration, :options)&.stringify_keys!
  end

  @task_role_arn = options[:task_role_arn]
  @execution_role_arn = options[:execution_role_arn]
  @logger = Wrapbox.logger
  if options[:log_fetcher]
    type = options[:log_fetcher][:type]
    @log_fetcher = LogFetcher.new(type, **options[:log_fetcher])
  end
end

Instance Attribute Details

#container_definitionsObject (readonly)

Returns the value of attribute container_definitions.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def container_definitions
  @container_definitions
end

#cpuObject (readonly)

Returns the value of attribute cpu.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def cpu
  @cpu
end

#enable_ecs_managed_tagsObject (readonly)

Returns the value of attribute enable_ecs_managed_tags.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def enable_ecs_managed_tags
  @enable_ecs_managed_tags
end

#enable_execute_commandObject (readonly)

Returns the value of attribute enable_execute_command.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def enable_execute_command
  @enable_execute_command
end

#main_container_nameObject (readonly)

Returns the value of attribute main_container_name.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def main_container_name
  @main_container_name
end

#memoryObject (readonly)

Returns the value of attribute memory.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def memory
  @memory
end

#nameObject (readonly)

Returns the value of attribute name.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def name
  @name
end

#network_configurationObject (readonly)

Returns the value of attribute network_configuration.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def network_configuration
  @network_configuration
end

#network_modeObject (readonly)

Returns the value of attribute network_mode.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def network_mode
  @network_mode
end

#placement_constraintsObject (readonly)

Returns the value of attribute placement_constraints.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def placement_constraints
  @placement_constraints
end

#placement_strategyObject (readonly)

Returns the value of attribute placement_strategy.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def placement_strategy
  @placement_strategy
end

#propagate_tagsObject (readonly)

Returns the value of attribute propagate_tags.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def propagate_tags
  @propagate_tags
end

#regionObject (readonly)

Returns the value of attribute region.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def region
  @region
end

#requires_compatibilitiesObject (readonly)

Returns the value of attribute requires_compatibilities.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def requires_compatibilities
  @requires_compatibilities
end

#revisionObject (readonly)

Returns the value of attribute revision.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def revision
  @revision
end

#tagsObject (readonly)

Returns the value of attribute tags.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def tags
  @tags
end

#task_definition_nameObject (readonly)

Returns the value of attribute task_definition_name.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def task_definition_name
  @task_definition_name
end

#volumesObject (readonly)

Returns the value of attribute volumes.



32
33
34
# File 'lib/wrapbox/runner/ecs.rb', line 32

def volumes
  @volumes
end

Class Method Details

.split_overridable_options_and_parameters(options) ⇒ Object



52
53
54
55
56
57
58
59
60
61
# File 'lib/wrapbox/runner/ecs.rb', line 52

def self.split_overridable_options_and_parameters(options)
  opts = options.dup
  overridable_options = {}
  %i[cluster launch_type task_role_arn execution_role_arn tags propagate_tags].each do |key|
    value = opts.delete(key)
    overridable_options[key] = value if value
  end

  [overridable_options, opts]
end

Instance Method Details

#run(class_name, method_name, args, container_definition_overrides: {}, **parameters) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/wrapbox/runner/ecs.rb', line 145

def run(class_name, method_name, args, container_definition_overrides: {}, **parameters)
  task_definition = prepare_task_definition(container_definition_overrides)
  parameter = Parameter.new(**parameters)

  envs = parameters[:environments] || []
  envs += [
    {
      name: CLASS_NAME_ENV,
      value: class_name.to_s,
    },
    {
      name: METHOD_NAME_ENV,
      value: method_name.to_s,
    },
    {
      name: METHOD_ARGS_ENV,
      value: MultiJson.dump(args),
    },
  ]

  if @instance_manager
    Thread.new { @instance_manager.start_preparing_instances(1) }
  end

  run_task(task_definition.task_definition_arn, ["bundle", "exec", "rake", "wrapbox:run"], envs, parameter)
ensure
  @instance_manager&.terminate_all_instances
end

#run_cmd(cmds, container_definition_overrides: {}, ignore_signal: false, **parameters) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/wrapbox/runner/ecs.rb', line 174

def run_cmd(cmds, container_definition_overrides: {}, ignore_signal: false, **parameters)
  ths = []

  task_definition = prepare_task_definition(container_definition_overrides)
  parameter = Parameter.new(**parameters)

  cmds << nil if cmds.empty?

  if @instance_manager
    Thread.new { @instance_manager.start_preparing_instances(cmds.size) }
  end

  cmds.each_with_index do |cmd, idx|
    ths << Thread.new(cmd, idx) do |c, i|
      Thread.current[:cmd_index] = i
      envs = (parameters[:environments] || []) + [{name: "WRAPBOX_CMD_INDEX", value: i.to_s}]
      run_task(task_definition.task_definition_arn, c&.shellsplit, envs, parameter)
    end
  end
  ThreadsWait.all_waits(ths)
  # Raise an error if some threads have an error
  ths.each(&:join)

  true
rescue SignalException => e
  sig = "SIG#{Signal.signame(e.signo)}"
  if ignore_signal
    @logger.info("Receive #{sig} signal. But ECS Tasks continue running")
  else
    @logger.info("Receive #{sig} signal. Stop All tasks")
    ths.each do |th|
      th.report_on_exception = false
      th.raise(e)
    end
    wait_until = Time.now + TERM_TIMEOUT + 15 # thread_timeout_buffer
    ths.each do |th|
      wait = wait_until - Time.now
      th.join(wait) if wait.positive?
    end
  end
  nil
ensure
  @instance_manager&.terminate_all_instances
end