Class: ScbiMapreduce::Manager
- Inherits:
-
Object
- Object
- ScbiMapreduce::Manager
- Defined in:
- lib/scbi_mapreduce/manager.rb
Instance Attribute Summary collapse
-
#checkpointing ⇒ Object
Returns the value of attribute checkpointing.
-
#chunk_size ⇒ Object
Returns the value of attribute chunk_size.
-
#exit_on_many_errors ⇒ Object
Returns the value of attribute exit_on_many_errors.
-
#keep_order ⇒ Object
Returns the value of attribute keep_order.
-
#retry_stuck_jobs ⇒ Object
Returns the value of attribute retry_stuck_jobs.
Instance Method Summary collapse
-
#initialize(server_ip, port, workers, work_manager_class, custom_worker_file, log_file = nil, init_env_file = nil) ⇒ Manager
constructor
initialize Manager.
- #save_stats(stats = nil, filename = 'scbi_mapreduce_stats.json') ⇒ Object
-
#start_server ⇒ Object
Start a EventMachine loop acting as a server for incoming workers connections.
- #stats ⇒ Object
Constructor Details
#initialize(server_ip, port, workers, work_manager_class, custom_worker_file, log_file = nil, init_env_file = nil) ⇒ Manager
initialize Manager
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/scbi_mapreduce/manager.rb', line 23 def initialize(server_ip, port, workers, work_manager_class,custom_worker_file,log_file=nil, init_env_file=nil) @port=port if log_file.nil? log_file = File.join('logs','server_log.txt') end if ((log_file!=STDOUT) && (!File.exists?(File.dirname(log_file)))) FileUtils.mkdir_p(File.dirname(log_file)) $SERVER_LOG.info("Creating logs folder") end $SERVER_LOG = Logger.new(log_file) ip_list = Socket.ip_address_list.select{|e| e.ipv4?}.map{|e| e.ip_address} # La forma de abajo no encuentra la myrinet # ip_list = Socket::getaddrinfo(Socket.gethostname, "echo", Socket::AF_INET).map{|x| x[3]}.uniq ip_list << '127.0.0.1' $SERVER_LOG.info("Available IPs: #{ip_list}") ip=ip_list.select{|one_ip| one_ip.index(server_ip)==0}.first if !ip $SERVER_LOG.info("Ip #{server_ip} not found in available IPs: #{ip_list}") ip='0.0.0.0' # gets end @ip = ip port = 0 @checkpointing=false @keep_order=false @retry_stuck_jobs=false @exit_on_many_errors=true @chunk_size=1 @worker_names=[] if workers.is_a?(Integer) @workers=workers else # workers is a file with names, or an array # read file if workers.is_a?(String) && File.exists?(workers) $SERVER_LOG.info("Loading workers file: #{workers}") workers = File.read(workers).split("\n").map{|w| w.chomp} end # puts "find worker_names" host_name=`hostname`.chomp @workers=workers.count(host_name) @worker_names=workers @worker_names.delete(host_name) # puts @workers end @work_manager_class = work_manager_class @worker_launcher = WorkerLauncher.new(@ip,port,ip_list,@workers,custom_worker_file,log_file,init_env_file) $SERVER_LOG.info("Local workers: #{@workers}") $SERVER_LOG.info("Remote workers: #{@worker_names}") $SERVER_LOG.datetime_format = "%Y-%m-%d %H:%M:%S" end |
Instance Attribute Details
#checkpointing ⇒ Object
Returns the value of attribute checkpointing.
20 21 22 |
# File 'lib/scbi_mapreduce/manager.rb', line 20 def checkpointing @checkpointing end |
#chunk_size ⇒ Object
Returns the value of attribute chunk_size.
20 21 22 |
# File 'lib/scbi_mapreduce/manager.rb', line 20 def chunk_size @chunk_size end |
#exit_on_many_errors ⇒ Object
Returns the value of attribute exit_on_many_errors.
20 21 22 |
# File 'lib/scbi_mapreduce/manager.rb', line 20 def exit_on_many_errors @exit_on_many_errors end |
#keep_order ⇒ Object
Returns the value of attribute keep_order.
20 21 22 |
# File 'lib/scbi_mapreduce/manager.rb', line 20 def keep_order @keep_order end |
#retry_stuck_jobs ⇒ Object
Returns the value of attribute retry_stuck_jobs.
20 21 22 |
# File 'lib/scbi_mapreduce/manager.rb', line 20 def retry_stuck_jobs @retry_stuck_jobs end |
Instance Method Details
#save_stats(stats = nil, filename = 'scbi_mapreduce_stats.json') ⇒ Object
139 140 141 |
# File 'lib/scbi_mapreduce/manager.rb', line 139 def save_stats(stats=nil, filename='scbi_mapreduce_stats.json') @work_manager_class.save_stats(stats,filename) end |
#start_server ⇒ Object
Start a EventMachine loop acting as a server for incoming workers connections
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/scbi_mapreduce/manager.rb', line 100 def start_server # set a custom error handler, otherwise errors are silently ignored when they occurs inside a callback. EM.error_handler{ |e| $SERVER_LOG.error(e. + ' => ' + e.backtrace.join("\n")) } # $SERVER_LOG.info("Installing INT and TERM traps in #{@work_manager_class}") # Signal.trap("INT") { puts "TRAP INT";@work_manager_class.controlled_exit; EM.stop} # Signal.trap("TERM") { puts "TRAP TERM";@work_manager_class.controlled_exit; EM.stop} # start EM loop EventMachine::run { @work_manager_class.init_work_manager_internals(@checkpointing, @keep_order, @retry_stuck_jobs,@exit_on_many_errors,@chunk_size) evm=EventMachine::start_server @ip, @port, @work_manager_class dir=Socket.unpack_sockaddr_in( EM.get_sockname( evm )) @port = dir[0].to_i @ip=dir[1].to_s $SERVER_LOG.info 'Server running at : ['+@ip.to_s+':'+@port.to_s+']' @worker_launcher.server_port=@port @worker_launcher.launch_workers @worker_launcher.launch_external_workers(@worker_names) } rescue Exception => e $SERVER_LOG.error("Exiting server due to exception:\n" + e.+"\n"+e.backtrace.join("\n")) @work_manager_class.end_work_manager end |
#stats ⇒ Object
135 136 137 |
# File 'lib/scbi_mapreduce/manager.rb', line 135 def stats @work_manager_class.stats end |