Class: Liri::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/manager/manager.rb,
lib/manager/credential.rb

Defined Under Namespace

Classes: Credential

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(udp_port, tcp_port, all_tests, tests_result, manager_folder_path) ⇒ Manager

Returns a new instance of Manager.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/manager/manager.rb', line 97

def initialize(udp_port, tcp_port, all_tests, tests_result, manager_folder_path)
  @udp_port = udp_port
  @udp_socket = UDPSocket.new
  @tcp_port = tcp_port

  @all_tests = all_tests
  @all_tests_count = all_tests.size
  @all_tests_results = {}
  @all_tests_results_count = 0
  @all_tests_processing_count = 0
  @agents = {}

  @agents_search_processing_enabled = true
  @test_processing_enabled = true

  @tests_batch_number = 0
  @tests_batches = {}

  @tests_result = tests_result
  @semaphore = Mutex.new

  @manager_folder_path = manager_folder_path

  @progressbar = ProgressBar.create(starting_at: 0, total: @all_tests_count, length: 100, format: 'Progress %c/%C |%b=%i| %p%% | %a')
end

Class Method Details

.run(source_code_folder_path, stop = false) ⇒ Object

Inicia la ejecución del Manager

Parameters:

  • stop (Boolean) (defaults to: false)

    el valor true es para que no se ejecute infinitamente el método en el test unitario.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/manager/manager.rb', line 15

def run(source_code_folder_path, stop = false)
  return unless valid_project

  setup_manager = Liri.set_setup(source_code_folder_path)
  manager_folder_path = setup_manager.manager_folder_path

  Liri.set_logger(setup_manager.logs_folder_path, 'liri-manager.log')
  Liri.logger.info("Proceso Manager iniciado")
  Liri.logger.info("Presione Ctrl + c para terminar el proceso Manager manualmente\n", true)

  user, password = get_credentials(setup_manager.setup_folder_path)
  source_code = compress_source_code(source_code_folder_path, manager_folder_path)
  manager_data = get_manager_data(user, password, manager_folder_path, source_code)
  all_tests = get_all_tests(source_code)
  tests_result = Common::TestsResult.new(manager_folder_path)

  manager = Manager.new(Liri.udp_port, Liri.tcp_port, all_tests, tests_result, manager_folder_path)

  threads = []
  threads << manager.start_client_socket_to_search_agents(manager_data) # Enviar peticiones broadcast a toda la red para encontrar Agents
  manager.start_server_socket_to_process_tests(threads[0]) unless stop # Esperar y enviar los test unitarios a los Agents

  Liri.init_exit(stop, threads, 'Manager')
  Liri.logger.info("Proceso Manager terminado")
rescue SignalException => e
  Liri.logger.info("Exception(#{e}) Proceso Manager terminado manualmente")
  Liri.kill(threads)
end

.test_files_by_runnerObject



44
45
46
# File 'lib/manager/manager.rb', line 44

def test_files_by_runner
  Liri.setup.test_files_by_runner
end

Instance Method Details

#agents_search_processing_enabledObject



241
242
243
244
245
# File 'lib/manager/manager.rb', line 241

def agents_search_processing_enabled
  @semaphore.synchronize do
    @agents_search_processing_enabled
  end
end

#agents_search_processing_enabled=(value) ⇒ Object



235
236
237
238
239
# File 'lib/manager/manager.rb', line 235

def agents_search_processing_enabled=(value)
  @semaphore.synchronize do
    @agents_search_processing_enabled = value
  end
end

#all_testsObject



229
230
231
232
233
# File 'lib/manager/manager.rb', line 229

def all_tests
  @semaphore.synchronize do
    @all_tests
  end
end


305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/manager/manager.rb', line 305

def print_agents_summary
  rows = @agents.values.map { |line| line.values }
  headings = @agents.values.first.keys

  table = Terminal::Table.new title: "Resúmen", headings: headings, rows: rows
  table.style = {padding_left: 3, border_x: "=", border_i: "x" }
  table.align_column(1, :right)
  table.align_column(2, :right)
  table.align_column(3, :right)
  table.align_column(4, :right)
  table.align_column(5, :right)

  puts table
end

#process_tests_result(agent_ip_address, tests_result, time_in_seconds) ⇒ Object



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/manager/manager.rb', line 278

def process_tests_result(agent_ip_address, tests_result, time_in_seconds)
  # Se inicia un semáforo para evitar que varios hilos actualicen variables compartidas
  @semaphore.synchronize do
    tests_batch_number = tests_result['tests_batch_number']
    tests_result_file_name = tests_result['tests_result_file_name']
    tests_batch_keys_size = tests_result['tests_batch_keys_size']

    #tests_batch_keys = @tests_batches[tests_batch_number][:tests_batch_keys]
    tests_processed_count = tests_batch_keys_size
    @all_tests_results_count += tests_processed_count

    @progressbar.progress = @all_tests_results_count

    #@tests_batches[tests_batch_number][:tests_result_file_name] = tests_result_file_name

    tests_result = @tests_result.process(tests_result_file_name)

    @agents[agent_ip_address][:tests_processed_count] += tests_processed_count
    @agents[agent_ip_address][:examples] += tests_result[:example_quantity]
    @agents[agent_ip_address][:failures] += tests_result[:failure_quantity]
    @agents[agent_ip_address][:time_in_seconds] += time_in_seconds
    @agents[agent_ip_address][:duration] = @agents[agent_ip_address][:time_in_seconds].to_duration

    Liri.logger.info("Pruebas procesadas por Agente: #{agent_ip_address}: #{@agents[agent_ip_address][:tests_processed_count]}")
  end
end

#start_client_socket_to_search_agents(manager_data) ⇒ Object

Inicia un cliente udp que hace un broadcast en toda la red para iniciar una conexión con los Agent que estén escuchando



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/manager/manager.rb', line 124

def start_client_socket_to_search_agents(manager_data)
  # El cliente udp se ejecuta en bucle dentro de un hilo, esto permite realizar otras tareas mientras este hilo sigue sondeando
  # la red para obtener mas Agents. Una vez que los tests terminan de ejecutarse, este hilo será finalizado.
  Thread.new do
    Liri.logger.info("Buscando Agentes... Espere")
    Liri.logger.info("Se emite un broadcast cada #{Liri.udp_request_delay} segundos en el puerto UDP: #{@udp_port}
                                 (Se mantiene escaneando la red para encontrar Agents)
    ")
    while agents_search_processing_enabled
      @udp_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
      @udp_socket.send(manager_data.to_h.to_json, 0, '<broadcast>', @udp_port)
      sleep(Liri.udp_request_delay) # Se pausa un momento antes de efectuar nuevamente la petición broadcast
    end
  end
end

#start_server_socket_to_process_tests(search_agents_thread) ⇒ Object

Inicia un servidor tcp para procesar los pruebas después de haberse iniciado la conexión a través de udp



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/manager/manager.rb', line 141

def start_server_socket_to_process_tests(search_agents_thread)
  begin
    tcp_socket = TCPServer.new(@tcp_port) # se hace un bind al puerto dado
  rescue Errno::EADDRINUSE => e
    Liri.logger.error("Exception(#{e}) Puerto TCP #{@tcp_port} ocupado.")
    Thread.kill(search_agents_thread)
    Thread.exit
  end

  Liri.logger.info("En espera para establecer conexión con los Agents en el puerto TCP: #{@tcp_port}
                                 (Se espera que algún Agent se conecte para ejecutar las pruebas como respuesta al broadcast UDP)
  ")
  # El siguiente bucle permite que varios clientes es decir Agents se conecten
  # De: http://www.w3big.com/es/ruby/ruby-socket-programming.html
  while test_processing_enabled
    Thread.start(tcp_socket.accept) do |client|
      agent_ip_address = client.remote_address.ip_address
      response = client.recvfrom(1000).first

      Liri.logger.info("\nConexión iniciada con el Agente: #{agent_ip_address}")
      Liri.logger.info("Respuesta al broadcast recibida del Agent: #{agent_ip_address} en el puerto TCP: #{@tcp_port}:  #{response}")

      # Se le indica al agente que proceda
      client.puts({ msg: 'Recibido', exist_tests: all_tests.any? }.to_json)

      if all_tests.empty?
        # No importa lo que le haga, el broadcast udp no se muere al instante y el agente sigue respondiendo
        # Las siguientes dos lineas son para que se deje de hacer el broadcast pero aun asi se llegan a hacer
        # 3 a 4 broadcast antes de que se finalize el proceso, al parecer el broadcast va a tener que quedar asi
        # y mejorar el codigo para que se envien test pendientes para eso hay que llevar una lista de test pendientes
        # tests enviados sin resultados, tests finalizados, si se recibe respuesta al broadcast se trata de enviar primero test pendientes
        # luego test enviados sin resultados o sino ignorar
        Thread.kill(search_agents_thread)
        agents_search_processing_enabled = false
        Liri.logger.info("Se termina cualquier proceso pendiente con el Agent #{agent_ip_address} en el puerto TCP: #{@tcp_port}: #{response}")
        client.close
        Thread.exit
      end

      while all_tests.any?
        time_in_seconds = Liri::Common::Benchmarking.start(start_msg: "Proceso de Ejecución de pruebas. Agent: #{agent_ip_address}. Espere... ", end_msg: "Proceso de Ejecución de pruebas. Agent: #{agent_ip_address}. Duración: ", stdout: false) do
          tests_batch = tests_batch(agent_ip_address)
          break unless tests_batch

          begin
            Liri.logger.debug("Conjunto de pruebas enviadas al Agent #{agent_ip_address}: #{tests_batch}")

            client.puts(tests_batch.to_json) # Se envia el lote de tests
            response = client.recvfrom(1000).first # Se recibe la respuesta. Cuando mas alto es el parámetro de recvfrom, mas datos se reciben osino se truncan.
          rescue Errno::EPIPE => e
            # Esto al parecer se da cuando el Agent ya cerró las conexiones y el Manager intenta contactar
            Liri.logger.error("Exception(#{e}) El Agent #{agent_ip_address} ya terminó la conexión")
            # Si el Agente ya no responde es mejor romper el bucle para que no quede colgado
            break
          end
        end

        # Se captura por si acaso los errores de parseo JSON
        begin
          tests_result = JSON.parse(response)
          Liri.logger.debug("Respuesta del Agent #{agent_ip_address}: #{tests_result}")
          process_tests_result(agent_ip_address, tests_result, time_in_seconds)
        rescue JSON::ParserError => e
          Liri.logger.error("Exception(#{e}) Error de parseo JSON")
        end
      end

      update_processing_statuses
      Liri.logger.info("Se termina la conexión con el Agent #{agent_ip_address} en el puerto TCP: #{@tcp_port}")
      begin
        client.puts('exit') # Se envía el string exit para que el Agent sepa que el proceso terminó
        client.close # se desconecta el cliente
      rescue Errno::EPIPE => e
        # Esto al parecer se da cuando el Agent ya cerró las conexiones y el Manager intenta contactar
        Liri.logger.error("Exception(#{e}) El Agent #{agent_ip_address} ya terminó la conexión")
     # Si el Agente ya no responde es mejor terminar el hilo. Aunque igual quedará colgado el Manager
     # mientras sigan pruebas pendientes
        Thread.exit
      end
    end
  end

  Liri.clean_folder_content(@manager_folder_path)
  @tests_result.print_summary
  print_agents_summary
  @tests_result.print_failures if Liri.print_failures
end

#test_processing_enabledObject



247
248
249
250
251
# File 'lib/manager/manager.rb', line 247

def test_processing_enabled
  @semaphore.synchronize do
    @test_processing_enabled
  end
end

#tests_batch(agent_ip_address) ⇒ Object



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/manager/manager.rb', line 260

def tests_batch(agent_ip_address)
  # Se inicia un semáforo para evitar que varios hilos actualicen variables compartidas
  @semaphore.synchronize do
    return nil if @all_tests.empty?

    @tests_batch_number += 1 # Se numera cada lote
    samples = @all_tests.sample!(Manager.test_files_by_runner) # Se obtiene algunos tests
    samples_keys = samples.keys # Se obtiene la clave asignada a los tests
    @all_tests_processing_count += samples_keys.size

    @agents[agent_ip_address] = { agent_ip_address: agent_ip_address, tests_processed_count: 0, examples: 0, failures: 0, time_in_seconds: 0, duration: '' } unless @agents[agent_ip_address]

    #@tests_batches[@tests_batch_number] = { agent_ip_address: agent_ip_address, tests_batch_keys: samples_keys } # Se guarda el lote a enviar
    tests_batch = { tests_batch_number: @tests_batch_number, tests_batch_keys: samples_keys } # Se construye el lote a enviar
    tests_batch
  end
end

#update_processing_statusesObject



253
254
255
256
257
258
# File 'lib/manager/manager.rb', line 253

def update_processing_statuses
  @semaphore.synchronize do
    @test_processing_enabled = false if @all_tests_count == @all_tests_results_count
    @agents_search_processing_enabled = false if @all_tests_count == @all_tests_processing_count
  end
end