Class: Tribunus

Inherits:
Object
  • Object
show all
Defined in:
lib/roma/tools/tribunus.rb

Defined Under Namespace

Classes: RomadHost

Constant Summary collapse

HEARTBEAT_SECONDS =
300
HEARTBEAT_LOOP_INTERVAL =
50
TIMEOUT_SECONDS =
600

Instance Method Summary collapse

Constructor Details

#initialize(romad_hostname, romad_ports, options = {}) ⇒ Tribunus

Returns a new instance of Tribunus.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/roma/tools/tribunus.rb', line 32

def initialize(romad_hostname,romad_ports,options={})
  @multi_addr=options[:multicast_addr]||MULTICAST_ADDR
  @port=options[:udp_port]||UDP_PORT
  @romad_work_dir=options[:romad_work_dir]||ROMAD_WORK_DIR
  @ruby_command_name=options[:ruby_command_name]||"ruby"
  @verbose=options[:verbose]
  log [:initalized,@multi_addr,@port,@romad_work_dir,@ruby_command_name]

  @threads=[]
  @romads={} #port => pid
  @local_romad_host=RomadHost.new(romad_hostname,romad_ports,nil)

  @mutex=Mutex.new
  @remote_servers={} #ipaddr => RomadHost

end

Instance Method Details

#choose_rhostObject



191
192
193
194
195
196
197
198
# File 'lib/roma/tools/tribunus.rb', line 191

def choose_rhost
  @remote_servers.each do|ipaddr,rhost|
    unless rhost.ports.empty?
      return rhost
    end
  end
  nil
end

#heartbeat_loopObject



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/roma/tools/tribunus.rb', line 144

def heartbeat_loop
  loop do
    delete_ipaddrs=[]
    @remote_servers.each do|ipaddr,host|

      if host.updated_at+TIMEOUT_SECONDS < Time.now
        delete_ipaddrs << ipaddr
      elsif host.updated_at+HEARTBEAT_SECONDS < Time.now
        unicast(ipaddr,update_message(0))
      elsif host.ports.empty?
        unicast(ipaddr,update_message(1))
      end
    end

    @mutex.synchronize do
      delete_ipaddrs.each do|ipaddr|
        @remote_servers.delete(ipaddr)
      end
    end




    sleep(HEARTBEAT_LOOP_INTERVAL)
  end
rescue =>e
  p e
end

#joinObject



220
221
222
# File 'lib/roma/tools/tribunus.rb', line 220

def join
  @threads.each{|t|t.join}
end

#multicast(msg) ⇒ Object



235
236
237
# File 'lib/roma/tools/tribunus.rb', line 235

def multicast(msg)
  unicast(@multi_addr,msg)
end

#receive_update_command(ipaddr, reply_count, params) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/roma/tools/tribunus.rb', line 89

def receive_update_command(ipaddr,reply_count,params)
  param_ary=params.strip.split(/\s+/)
  unless param_ary.empty?
    hostname=param_ary[0]
    ports=param_ary[1..-1].map{|port| port.to_i}
    rhost=RomadHost.new(hostname,ports,Time.now)
    @remote_servers[ipaddr]=rhost
    if reply_count>0
      unicast(ipaddr,update_message(reply_count-1))
    end
  end
end

#server_loopObject



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/roma/tools/tribunus.rb', line 126

def server_loop
  sock=UDPSocket.new
  sock.bind('0.0.0.0',@port)
  sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, IPAddr.new(MULTICAST_ADDR).hton+IPAddr.new('0.0.0.0').hton)
  log 'start_listen'
  Socket.udp_server_loop_on([sock]) do|msg,msg_src|
      log [:received ,msg,msg_src]
    if from_remote?(msg_src.remote_address.ip_address)
      run_command(msg_src.remote_address.ip_address,msg)
    end
  end
end

#set_trapObject



173
174
175
176
177
# File 'lib/roma/tools/tribunus.rb', line 173

def set_trap
  [:INT,:TERM,:HUP].each do|sig|
    Signal.trap(sig){ Process.kill(sig,*@romads.values);exit(1)  }
  end
end

#spawn_new_roma_ringObject



55
56
57
# File 'lib/roma/tools/tribunus.rb', line 55

def spawn_new_roma_ring
  spawn_romads(nil,nil)
end

#spawn_romad_join(port, remote_host, remote_port) ⇒ Object



83
84
85
86
# File 'lib/roma/tools/tribunus.rb', line 83

def spawn_romad_join(port,remote_host,remote_port)
  pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname,"-j","#{remote_host}_#{remote_port}",:chdir=>@romad_work_dir)
  @romads[port]=pid
end

#spawn_romads(remote_host, remote_port) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/roma/tools/tribunus.rb', line 60

def spawn_romads(remote_host,remote_port)
  nodes=@local_romad_host.ports.map do|port|
    "#{@local_romad_host.hostname}_#{port}"
  end
  nodes << "#{remote_host}_#{remote_port}" if remote_host
  pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,MKROUTE_PATH,*nodes,:chdir=>@romad_work_dir)
  if(Process.waitpid2(pid)[1].to_i!=0)
    raise "failed to make route"
  end

  @local_romad_host.ports.each do|port|
    pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname, :chdir=>@romad_work_dir)
    @romads[port]=pid
  end
end

#spawn_romads_join(remote_host, remote_port) ⇒ Object



76
77
78
79
80
# File 'lib/roma/tools/tribunus.rb', line 76

def spawn_romads_join(remote_host,remote_port)
  @local_romad_host.ports.map do|port|
    spawn_romad_join(port,remote_host,remote_port)
  end
end

#start_discoverObject



205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/roma/tools/tribunus.rb', line 205

def start_discover
  prepare_to_start
  sleep(0.2)
  multicast(update_message(1,true))
  10.times{sleep(0.3)}
  rhost=choose_rhost
  if rhost
    spawn_romads_join(rhost.hostname,rhost.ports.first)
  else
    $stderr.puts "no server responded"
    exit 1
  end

end

#start_join(host, port) ⇒ Object



199
200
201
202
203
# File 'lib/roma/tools/tribunus.rb', line 199

def start_join(host,port)
  prepare_to_start
  sleep(0.2)
  spawn_romads_join(host,port)
end

#start_new_ringObject



186
187
188
189
# File 'lib/roma/tools/tribunus.rb', line 186

def start_new_ring
  prepare_to_start
  spawn_new_roma_ring
end

#unicast(ipaddr, msg) ⇒ Object



224
225
226
227
228
229
230
231
232
233
# File 'lib/roma/tools/tribunus.rb', line 224

def unicast(ipaddr,msg)
  log [:message, ipaddr,msg]
  s=UDPSocket.new
  begin
    s.connect(ipaddr,@port)
    s.sendmsg(msg)
  ensure
      s.close
  end
end

#update_message(reply_count, initial = false) ⇒ Object



116
117
118
119
120
121
122
123
124
# File 'lib/roma/tools/tribunus.rb', line 116

def update_message(reply_count,initial=false)
  msg="update:#{reply_count}: #{@local_romad_host.hostname}"
  if !initial && !@romads.keys.empty?
    msg+=" "
    msg+= @romads.keys.join(' ')
  end
  log [:msg, msg,@romads]
  msg
end