Class: Rinda::RingServer

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped
Defined in:
lib/rinda2/ring/server.rb

Overview

A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts. Default service location uses the following steps:

  1. A RingServer begins listening on the network broadcast UDP address.

  2. A RingFinger sends a UDP packet containing the DRb URI where it will listen for a reply.

  3. The RingServer receives the UDP packet and connects back to the provided DRb URI with the DRb service.

A RingServer requires a TupleSpace:

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new

RingServer can also listen on multicast addresses for announcements. This allows multiple RingServers to run on the same host. To use network broadcast and multicast:

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1]

Defined Under Namespace

Classes: Renewer

Instance Method Summary collapse

Constructor Details

#initialize(ts, addresses = [Socket::INADDR_ANY], port = Ring_PORT) ⇒ RingServer

Advertises ts on the given addresses at port.

If addresses is omitted only the UDP broadcast address is used.

addresses can contain multiple addresses. If a multicast address is given in addresses then the RingServer will listen for multicast queries.

If you use IPv4 multicast you may need to set an address of the inbound interface which joins a multicast group.

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']])

You can set addresses as an Array Object. The first element of the Array is a multicast address and the second is an inbound interface address. If the second is omitted then ‘0.0.0.0’ is used.

If you use IPv6 multicast you may need to set both the local interface address and the inbound interface index:

rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]])

The first element is a multicast address and the second is an inbound interface address. The third is an inbound interface index.

At this time there is no easy way to get an interface index by name.

If the second is omitted then ‘::1’ is used. If the third is omitted then 0 (default interface) is used.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/rinda2/ring/server.rb', line 81

def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
  @port = port

  if Integer === addresses then
    addresses, @port = [Socket::INADDR_ANY], addresses
  end

  @renewer = Renewer.new

  @ts = ts
  @sockets = []
  addresses.each do |address|
    if Array === address
      make_socket(*address)
    else
      make_socket(address)
    end
  end

  @w_services = write_services
  @r_service  = reply_service
end

Instance Method Details

#do_replyObject

Pulls lookup tuples out of the TupleSpace and sends their DRb object the address of the local TupleSpace.



201
202
203
204
205
# File 'lib/rinda2/ring/server.rb', line 201

def do_reply
  tuple = @ts.take([:lookup_ring, nil], @renewer)
  Thread.new { tuple[1].call(@ts) rescue nil}
rescue
end

#do_write(msg) ⇒ Object

Extracts the response URI from msg and adds it to TupleSpace where it will be picked up by reply_service for notification.



176
177
178
179
180
181
182
183
184
# File 'lib/rinda2/ring/server.rb', line 176

def do_write(msg)
  Thread.new do
    begin
      tuple, sec = Marshal.load(msg)
      @ts.write(tuple, sec)
    rescue
    end
  end
end

#make_socket(address, interface_address = nil, multicast_interface = 0) ⇒ Object

Creates a socket at address

If address is multicast address then interface_address and multicast_interface can be set as optional.

A created socket is bound to interface_address. If you use IPv4 multicast then the interface of interface_address is used as the inbound interface. If interface_address is omitted or nil then ‘0.0.0.0’ or ‘::1’ is used.

If you use IPv6 multicast then multicast_interface is used as the inbound interface. multicast_interface is a network interface index. If multicast_interface is omitted then 0 (default interface) is used.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/rinda2/ring/server.rb', line 119

def make_socket(address, interface_address=nil, multicast_interface=0)
  addrinfo = Addrinfo.udp(address, @port)

  socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
                      addrinfo.protocol)
  @sockets << socket

  if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
    if Socket.const_defined?(:SO_REUSEPORT) then
      socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
    else
      socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
    end

    if addrinfo.ipv4_multicast? then
      interface_address = '0.0.0.0' if interface_address.nil?
      socket.bind(Addrinfo.udp('0.0.0.0', @port))

      mreq = IPAddr.new(addrinfo.ip_address).hton +
        IPAddr.new(interface_address).hton

      socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
    else
      interface_address = '::1' if interface_address.nil?
      socket.bind(Addrinfo.udp(interface_address, @port))

      mreq = IPAddr.new(addrinfo.ip_address).hton +
        [multicast_interface].pack('I')

      socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
    end
  else
    socket.bind(addrinfo)
  end

  socket
end

#reply_serviceObject

Creates a thread that notifies waiting clients from the TupleSpace.



189
190
191
192
193
194
195
# File 'lib/rinda2/ring/server.rb', line 189

def reply_service
  Thread.new do
    loop do
      do_reply
    end
  end
end

#shutdownObject

Shuts down the RingServer



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/rinda2/ring/server.rb', line 210

def shutdown
  @renewer.renew = false

  @w_services.each do |thread|
    thread.kill
    thread.join
  end

  @sockets.each do |socket|
    socket.close
  end

  @r_service.kill
  @r_service.join
end

#write_servicesObject

Creates threads that pick up UDP packets and passes them to do_write for decoding.



161
162
163
164
165
166
167
168
169
170
# File 'lib/rinda2/ring/server.rb', line 161

def write_services
  @sockets.map do |s|
    Thread.new(s) do |socket|
      loop do
        msg = socket.recv(9000)
        do_write(msg)
      end
    end
  end
end