Class: Tribunus
- Inherits:
-
Object
- Object
- Tribunus
- Defined in:
- lib/roma/tools/tribunus.rb
Defined Under Namespace
Classes: RomadHost
Constant Summary collapse
- HEARTBEAT_SECONDS =
300
- HEARTBEAT_LOOP_INTERVAL =
50
- TIMEOUT_SECONDS =
600
Instance Method Summary collapse
- #choose_rhost ⇒ Object
- #heartbeat_loop ⇒ Object
-
#initialize(romad_hostname, romad_ports, options = {}) ⇒ Tribunus
constructor
A new instance of Tribunus.
- #join ⇒ Object
- #multicast(msg) ⇒ Object
- #receive_update_command(ipaddr, reply_count, params) ⇒ Object
- #server_loop ⇒ Object
- #set_trap ⇒ Object
- #spawn_new_roma_ring ⇒ Object
- #spawn_romad_join(port, remote_host, remote_port) ⇒ Object
- #spawn_romads(remote_host, remote_port) ⇒ Object
- #spawn_romads_join(remote_host, remote_port) ⇒ Object
- #start_discover ⇒ Object
- #start_join(host, port) ⇒ Object
- #start_new_ring ⇒ Object
- #unicast(ipaddr, msg) ⇒ Object
- #update_message(reply_count, initial = false) ⇒ Object
Constructor Details
#initialize(romad_hostname, romad_ports, options = {}) ⇒ Tribunus
Returns a new instance of Tribunus.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/roma/tools/tribunus.rb', line 32 def initialize(romad_hostname,romad_ports,={}) @multi_addr=[:multicast_addr]||MULTICAST_ADDR @port=[:udp_port]||UDP_PORT @romad_work_dir=[:romad_work_dir]||ROMAD_WORK_DIR @ruby_command_name=[:ruby_command_name]||"ruby" @verbose=[:verbose] log [:initalized,@multi_addr,@port,@romad_work_dir,@ruby_command_name] @threads=[] @romads={} #port => pid @local_romad_host=RomadHost.new(romad_hostname,romad_ports,nil) @mutex=Mutex.new @remote_servers={} #ipaddr => RomadHost end |
Instance Method Details
#choose_rhost ⇒ Object
191 192 193 194 195 196 197 198 |
# File 'lib/roma/tools/tribunus.rb', line 191 def choose_rhost @remote_servers.each do|ipaddr,rhost| unless rhost.ports.empty? return rhost end end nil end |
#heartbeat_loop ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/roma/tools/tribunus.rb', line 144 def heartbeat_loop loop do delete_ipaddrs=[] @remote_servers.each do|ipaddr,host| if host.updated_at+TIMEOUT_SECONDS < Time.now delete_ipaddrs << ipaddr elsif host.updated_at+HEARTBEAT_SECONDS < Time.now unicast(ipaddr,(0)) elsif host.ports.empty? unicast(ipaddr,(1)) end end @mutex.synchronize do delete_ipaddrs.each do|ipaddr| @remote_servers.delete(ipaddr) end end sleep(HEARTBEAT_LOOP_INTERVAL) end rescue =>e p e end |
#join ⇒ Object
220 221 222 |
# File 'lib/roma/tools/tribunus.rb', line 220 def join @threads.each{|t|t.join} end |
#multicast(msg) ⇒ Object
235 236 237 |
# File 'lib/roma/tools/tribunus.rb', line 235 def multicast(msg) unicast(@multi_addr,msg) end |
#receive_update_command(ipaddr, reply_count, params) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/roma/tools/tribunus.rb', line 89 def receive_update_command(ipaddr,reply_count,params) param_ary=params.strip.split(/\s+/) unless param_ary.empty? hostname=param_ary[0] ports=param_ary[1..-1].map{|port| port.to_i} rhost=RomadHost.new(hostname,ports,Time.now) @remote_servers[ipaddr]=rhost if reply_count>0 unicast(ipaddr,(reply_count-1)) end end end |
#server_loop ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/roma/tools/tribunus.rb', line 126 def server_loop sock=UDPSocket.new sock.bind('0.0.0.0',@port) sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, IPAddr.new(MULTICAST_ADDR).hton+IPAddr.new('0.0.0.0').hton) log 'start_listen' Socket.udp_server_loop_on([sock]) do|msg,msg_src| log [:received ,msg,msg_src] if from_remote?(msg_src.remote_address.ip_address) run_command(msg_src.remote_address.ip_address,msg) end end end |
#set_trap ⇒ Object
173 174 175 176 177 |
# File 'lib/roma/tools/tribunus.rb', line 173 def set_trap [:INT,:TERM,:HUP].each do|sig| Signal.trap(sig){ Process.kill(sig,*@romads.values);exit(1) } end end |
#spawn_new_roma_ring ⇒ Object
55 56 57 |
# File 'lib/roma/tools/tribunus.rb', line 55 def spawn_new_roma_ring spawn_romads(nil,nil) end |
#spawn_romad_join(port, remote_host, remote_port) ⇒ Object
83 84 85 86 |
# File 'lib/roma/tools/tribunus.rb', line 83 def spawn_romad_join(port,remote_host,remote_port) pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname,"-j","#{remote_host}_#{remote_port}",:chdir=>@romad_work_dir) @romads[port]=pid end |
#spawn_romads(remote_host, remote_port) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/roma/tools/tribunus.rb', line 60 def spawn_romads(remote_host,remote_port) nodes=@local_romad_host.ports.map do|port| "#{@local_romad_host.hostname}_#{port}" end nodes << "#{remote_host}_#{remote_port}" if remote_host pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,MKROUTE_PATH,*nodes,:chdir=>@romad_work_dir) if(Process.waitpid2(pid)[1].to_i!=0) raise "failed to make route" end @local_romad_host.ports.each do|port| pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname, :chdir=>@romad_work_dir) @romads[port]=pid end end |
#spawn_romads_join(remote_host, remote_port) ⇒ Object
76 77 78 79 80 |
# File 'lib/roma/tools/tribunus.rb', line 76 def spawn_romads_join(remote_host,remote_port) @local_romad_host.ports.map do|port| spawn_romad_join(port,remote_host,remote_port) end end |
#start_discover ⇒ Object
205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/roma/tools/tribunus.rb', line 205 def start_discover prepare_to_start sleep(0.2) multicast((1,true)) 10.times{sleep(0.3)} rhost=choose_rhost if rhost spawn_romads_join(rhost.hostname,rhost.ports.first) else $stderr.puts "no server responded" exit 1 end end |
#start_join(host, port) ⇒ Object
199 200 201 202 203 |
# File 'lib/roma/tools/tribunus.rb', line 199 def start_join(host,port) prepare_to_start sleep(0.2) spawn_romads_join(host,port) end |
#start_new_ring ⇒ Object
186 187 188 189 |
# File 'lib/roma/tools/tribunus.rb', line 186 def start_new_ring prepare_to_start spawn_new_roma_ring end |
#unicast(ipaddr, msg) ⇒ Object
224 225 226 227 228 229 230 231 232 233 |
# File 'lib/roma/tools/tribunus.rb', line 224 def unicast(ipaddr,msg) log [:message, ipaddr,msg] s=UDPSocket.new begin s.connect(ipaddr,@port) s.sendmsg(msg) ensure s.close end end |
#update_message(reply_count, initial = false) ⇒ Object
116 117 118 119 120 121 122 123 124 |
# File 'lib/roma/tools/tribunus.rb', line 116 def (reply_count,initial=false) msg="update:#{reply_count}: #{@local_romad_host.hostname}" if !initial && !@romads.keys.empty? msg+=" " msg+= @romads.keys.join(' ') end log [:msg, msg,@romads] msg end |