Class: Liri::Manager
- Inherits:
-
Object
- Object
- Liri::Manager
- Defined in:
- lib/manager/manager.rb,
lib/manager/credential.rb
Defined Under Namespace
Classes: Credential
Class Method Summary collapse
-
.run(source_code_folder_path, stop = false) ⇒ Object
Inicia la ejecución del Manager.
- .test_files_by_runner ⇒ Object
Instance Method Summary collapse
- #agents_search_processing_enabled ⇒ Object
- #agents_search_processing_enabled=(value) ⇒ Object
- #all_tests ⇒ Object
-
#initialize(udp_port, tcp_port, all_tests, tests_result, manager_folder_path) ⇒ Manager
constructor
A new instance of Manager.
- #print_agents_summary ⇒ Object
- #process_tests_result(agent_ip_address, tests_result, time_in_seconds) ⇒ Object
-
#start_client_socket_to_search_agents(manager_data) ⇒ Object
Inicia un cliente udp que hace un broadcast en toda la red para iniciar una conexión con los Agent que estén escuchando.
-
#start_server_socket_to_process_tests(search_agents_thread) ⇒ Object
Inicia un servidor tcp para procesar los pruebas después de haberse iniciado la conexión a través de udp.
- #test_processing_enabled ⇒ Object
- #tests_batch(agent_ip_address) ⇒ Object
- #update_processing_statuses ⇒ Object
Constructor Details
#initialize(udp_port, tcp_port, all_tests, tests_result, manager_folder_path) ⇒ Manager
Returns a new instance of Manager.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/manager/manager.rb', line 97 def initialize(udp_port, tcp_port, all_tests, tests_result, manager_folder_path) @udp_port = udp_port @udp_socket = UDPSocket.new @tcp_port = tcp_port @all_tests = all_tests @all_tests_count = all_tests.size @all_tests_results = {} @all_tests_results_count = 0 @all_tests_processing_count = 0 @agents = {} @agents_search_processing_enabled = true @test_processing_enabled = true @tests_batch_number = 0 @tests_batches = {} @tests_result = tests_result @semaphore = Mutex.new @manager_folder_path = manager_folder_path @progressbar = ProgressBar.create(starting_at: 0, total: @all_tests_count, length: 100, format: 'Progress %c/%C |%b=%i| %p%% | %a') end |
Class Method Details
.run(source_code_folder_path, stop = false) ⇒ Object
Inicia la ejecución del Manager
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/manager/manager.rb', line 15 def run(source_code_folder_path, stop = false) return unless valid_project setup_manager = Liri.set_setup(source_code_folder_path) manager_folder_path = setup_manager.manager_folder_path Liri.set_logger(setup_manager.logs_folder_path, 'liri-manager.log') Liri.logger.info("Proceso Manager iniciado") Liri.logger.info("Presione Ctrl + c para terminar el proceso Manager manualmente\n", true) user, password = get_credentials(setup_manager.setup_folder_path) source_code = compress_source_code(source_code_folder_path, manager_folder_path) manager_data = get_manager_data(user, password, manager_folder_path, source_code) all_tests = get_all_tests(source_code) tests_result = Common::TestsResult.new(manager_folder_path) manager = Manager.new(Liri.udp_port, Liri.tcp_port, all_tests, tests_result, manager_folder_path) threads = [] threads << manager.start_client_socket_to_search_agents(manager_data) # Enviar peticiones broadcast a toda la red para encontrar Agents manager.start_server_socket_to_process_tests(threads[0]) unless stop # Esperar y enviar los test unitarios a los Agents Liri.init_exit(stop, threads, 'Manager') Liri.logger.info("Proceso Manager terminado") rescue SignalException => e Liri.logger.info("Exception(#{e}) Proceso Manager terminado manualmente") Liri.kill(threads) end |
Instance Method Details
#agents_search_processing_enabled ⇒ Object
241 242 243 244 245 |
# File 'lib/manager/manager.rb', line 241 def agents_search_processing_enabled @semaphore.synchronize do @agents_search_processing_enabled end end |
#agents_search_processing_enabled=(value) ⇒ Object
235 236 237 238 239 |
# File 'lib/manager/manager.rb', line 235 def agents_search_processing_enabled=(value) @semaphore.synchronize do @agents_search_processing_enabled = value end end |
#all_tests ⇒ Object
229 230 231 232 233 |
# File 'lib/manager/manager.rb', line 229 def all_tests @semaphore.synchronize do @all_tests end end |
#print_agents_summary ⇒ Object
305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/manager/manager.rb', line 305 def print_agents_summary rows = @agents.values.map { |line| line.values } headings = @agents.values.first.keys table = Terminal::Table.new title: "Resúmen", headings: headings, rows: rows table.style = {padding_left: 3, border_x: "=", border_i: "x" } table.align_column(1, :right) table.align_column(2, :right) table.align_column(3, :right) table.align_column(4, :right) table.align_column(5, :right) puts table end |
#process_tests_result(agent_ip_address, tests_result, time_in_seconds) ⇒ Object
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/manager/manager.rb', line 278 def process_tests_result(agent_ip_address, tests_result, time_in_seconds) # Se inicia un semáforo para evitar que varios hilos actualicen variables compartidas @semaphore.synchronize do tests_batch_number = tests_result['tests_batch_number'] tests_result_file_name = tests_result['tests_result_file_name'] tests_batch_keys_size = tests_result['tests_batch_keys_size'] #tests_batch_keys = @tests_batches[tests_batch_number][:tests_batch_keys] tests_processed_count = tests_batch_keys_size @all_tests_results_count += tests_processed_count @progressbar.progress = @all_tests_results_count #@tests_batches[tests_batch_number][:tests_result_file_name] = tests_result_file_name tests_result = @tests_result.process(tests_result_file_name) @agents[agent_ip_address][:tests_processed_count] += tests_processed_count @agents[agent_ip_address][:examples] += tests_result[:example_quantity] @agents[agent_ip_address][:failures] += tests_result[:failure_quantity] @agents[agent_ip_address][:time_in_seconds] += time_in_seconds @agents[agent_ip_address][:duration] = @agents[agent_ip_address][:time_in_seconds].to_duration Liri.logger.info("Pruebas procesadas por Agente: #{agent_ip_address}: #{@agents[agent_ip_address][:tests_processed_count]}") end end |
#start_client_socket_to_search_agents(manager_data) ⇒ Object
Inicia un cliente udp que hace un broadcast en toda la red para iniciar una conexión con los Agent que estén escuchando
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/manager/manager.rb', line 124 def start_client_socket_to_search_agents(manager_data) # El cliente udp se ejecuta en bucle dentro de un hilo, esto permite realizar otras tareas mientras este hilo sigue sondeando # la red para obtener mas Agents. Una vez que los tests terminan de ejecutarse, este hilo será finalizado. Thread.new do Liri.logger.info("Buscando Agentes... Espere") Liri.logger.info("Se emite un broadcast cada #{Liri.udp_request_delay} segundos en el puerto UDP: #{@udp_port} (Se mantiene escaneando la red para encontrar Agents) ") while agents_search_processing_enabled @udp_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) @udp_socket.send(manager_data.to_h.to_json, 0, '<broadcast>', @udp_port) sleep(Liri.udp_request_delay) # Se pausa un momento antes de efectuar nuevamente la petición broadcast end end end |
#start_server_socket_to_process_tests(search_agents_thread) ⇒ Object
Inicia un servidor tcp para procesar los pruebas después de haberse iniciado la conexión a través de udp
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/manager/manager.rb', line 141 def start_server_socket_to_process_tests(search_agents_thread) begin tcp_socket = TCPServer.new(@tcp_port) # se hace un bind al puerto dado rescue Errno::EADDRINUSE => e Liri.logger.error("Exception(#{e}) Puerto TCP #{@tcp_port} ocupado.") Thread.kill(search_agents_thread) Thread.exit end Liri.logger.info("En espera para establecer conexión con los Agents en el puerto TCP: #{@tcp_port} (Se espera que algún Agent se conecte para ejecutar las pruebas como respuesta al broadcast UDP) ") # El siguiente bucle permite que varios clientes es decir Agents se conecten # De: http://www.w3big.com/es/ruby/ruby-socket-programming.html while test_processing_enabled Thread.start(tcp_socket.accept) do |client| agent_ip_address = client.remote_address.ip_address response = client.recvfrom(1000).first Liri.logger.info("\nConexión iniciada con el Agente: #{agent_ip_address}") Liri.logger.info("Respuesta al broadcast recibida del Agent: #{agent_ip_address} en el puerto TCP: #{@tcp_port}: #{response}") # Se le indica al agente que proceda client.puts({ msg: 'Recibido', exist_tests: all_tests.any? }.to_json) if all_tests.empty? # No importa lo que le haga, el broadcast udp no se muere al instante y el agente sigue respondiendo # Las siguientes dos lineas son para que se deje de hacer el broadcast pero aun asi se llegan a hacer # 3 a 4 broadcast antes de que se finalize el proceso, al parecer el broadcast va a tener que quedar asi # y mejorar el codigo para que se envien test pendientes para eso hay que llevar una lista de test pendientes # tests enviados sin resultados, tests finalizados, si se recibe respuesta al broadcast se trata de enviar primero test pendientes # luego test enviados sin resultados o sino ignorar Thread.kill(search_agents_thread) agents_search_processing_enabled = false Liri.logger.info("Se termina cualquier proceso pendiente con el Agent #{agent_ip_address} en el puerto TCP: #{@tcp_port}: #{response}") client.close Thread.exit end while all_tests.any? time_in_seconds = Liri::Common::Benchmarking.start(start_msg: "Proceso de Ejecución de pruebas. Agent: #{agent_ip_address}. Espere... ", end_msg: "Proceso de Ejecución de pruebas. Agent: #{agent_ip_address}. Duración: ", stdout: false) do tests_batch = tests_batch(agent_ip_address) break unless tests_batch begin Liri.logger.debug("Conjunto de pruebas enviadas al Agent #{agent_ip_address}: #{tests_batch}") client.puts(tests_batch.to_json) # Se envia el lote de tests response = client.recvfrom(1000).first # Se recibe la respuesta. Cuando mas alto es el parámetro de recvfrom, mas datos se reciben osino se truncan. rescue Errno::EPIPE => e # Esto al parecer se da cuando el Agent ya cerró las conexiones y el Manager intenta contactar Liri.logger.error("Exception(#{e}) El Agent #{agent_ip_address} ya terminó la conexión") # Si el Agente ya no responde es mejor romper el bucle para que no quede colgado break end end # Se captura por si acaso los errores de parseo JSON begin tests_result = JSON.parse(response) Liri.logger.debug("Respuesta del Agent #{agent_ip_address}: #{tests_result}") process_tests_result(agent_ip_address, tests_result, time_in_seconds) rescue JSON::ParserError => e Liri.logger.error("Exception(#{e}) Error de parseo JSON") end end update_processing_statuses Liri.logger.info("Se termina la conexión con el Agent #{agent_ip_address} en el puerto TCP: #{@tcp_port}") begin client.puts('exit') # Se envía el string exit para que el Agent sepa que el proceso terminó client.close # se desconecta el cliente rescue Errno::EPIPE => e # Esto al parecer se da cuando el Agent ya cerró las conexiones y el Manager intenta contactar Liri.logger.error("Exception(#{e}) El Agent #{agent_ip_address} ya terminó la conexión") # Si el Agente ya no responde es mejor terminar el hilo. Aunque igual quedará colgado el Manager # mientras sigan pruebas pendientes Thread.exit end end end Liri.clean_folder_content(@manager_folder_path) @tests_result.print_summary print_agents_summary @tests_result.print_failures if Liri.print_failures end |
#test_processing_enabled ⇒ Object
247 248 249 250 251 |
# File 'lib/manager/manager.rb', line 247 def test_processing_enabled @semaphore.synchronize do @test_processing_enabled end end |
#tests_batch(agent_ip_address) ⇒ Object
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/manager/manager.rb', line 260 def tests_batch(agent_ip_address) # Se inicia un semáforo para evitar que varios hilos actualicen variables compartidas @semaphore.synchronize do return nil if @all_tests.empty? @tests_batch_number += 1 # Se numera cada lote samples = @all_tests.sample!(Manager.test_files_by_runner) # Se obtiene algunos tests samples_keys = samples.keys # Se obtiene la clave asignada a los tests @all_tests_processing_count += samples_keys.size @agents[agent_ip_address] = { agent_ip_address: agent_ip_address, tests_processed_count: 0, examples: 0, failures: 0, time_in_seconds: 0, duration: '' } unless @agents[agent_ip_address] #@tests_batches[@tests_batch_number] = { agent_ip_address: agent_ip_address, tests_batch_keys: samples_keys } # Se guarda el lote a enviar tests_batch = { tests_batch_number: @tests_batch_number, tests_batch_keys: samples_keys } # Se construye el lote a enviar tests_batch end end |
#update_processing_statuses ⇒ Object
253 254 255 256 257 258 |
# File 'lib/manager/manager.rb', line 253 def update_processing_statuses @semaphore.synchronize do @test_processing_enabled = false if @all_tests_count == @all_tests_results_count @agents_search_processing_enabled = false if @all_tests_count == @all_tests_processing_count end end |