Class: Rinda::RingServer

Inherits:
Object
  • Object
show all
Includes:
DRbUndumped
Defined in:
lib/rinda/ring.rb

Overview

A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts. Default service location uses the following steps:

  1. A RingServer begins listening on the network broadcast UDP address.

  2. A RingFinger sends a UDP packet containing the DRb URI where it will listen for a reply.

  3. The RingServer receives the UDP packet and connects back to the provided DRb URI with the DRb service.

A RingServer requires a TupleSpace:

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new

RingServer can also listen on multicast addresses for announcements. This allows multiple RingServers to run on the same host. To use network broadcast and multicast:

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1]

Defined Under Namespace

Classes: Renewer

Instance Method Summary collapse

Constructor Details

#initialize(ts, addresses = [Socket::INADDR_ANY], port = Ring_PORT) ⇒ RingServer

Advertises ts on the given addresses at port.

If addresses is omitted only the UDP broadcast address is used.

addresses can contain multiple addresses. If a multicast address is given in addresses then the RingServer will listen for multicast queries.

If you use IPv4 multicast you may need to set an address of the inbound interface which joins a multicast group.

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']])

You can set addresses as an Array Object. The first element of the Array is a multicast address and the second is an inbound interface address. If the second is omitted then ‘0.0.0.0’ is used.

If you use IPv6 multicast you may need to set both the local interface address and the inbound interface index:

rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]])

The first element is a multicast address and the second is an inbound interface address. The third is an inbound interface index.

At this time there is no easy way to get an interface index by name.

If the second is omitted then ‘::1’ is used. If the third is omitted then 0 (default interface) is used.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/rinda/ring.rb', line 94

def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
  @port = port

  if Integer === addresses then
    addresses, @port = [Socket::INADDR_ANY], addresses
  end

  @renewer = Renewer.new

  @ts = ts
  @sockets = []
  addresses.each do |address|
    if Array === address
      make_socket(*address)
    else
      make_socket(address)
    end
  end

  @w_services = write_services
  @r_service  = reply_service
end

Instance Method Details

#do_replyObject

Pulls lookup tuples out of the TupleSpace and sends their DRb object the address of the local TupleSpace.



214
215
216
217
218
# File 'lib/rinda/ring.rb', line 214

def do_reply
  tuple = @ts.take([:lookup_ring, nil], @renewer)
  Thread.new { tuple[1].call(@ts) rescue nil}
rescue
end

#do_write(msg) ⇒ Object

Extracts the response URI from msg and adds it to TupleSpace where it will be picked up by reply_service for notification.



189
190
191
192
193
194
195
196
197
# File 'lib/rinda/ring.rb', line 189

def do_write(msg)
  Thread.new do
    begin
      tuple, sec = Marshal.load(msg)
      @ts.write(tuple, sec)
    rescue
    end
  end
end

#make_socket(address, interface_address = nil, multicast_interface = 0) ⇒ Object

Creates a socket at address

If address is multicast address then interface_address and multicast_interface can be set as optional.

A created socket is bound to interface_address. If you use IPv4 multicast then the interface of interface_address is used as the inbound interface. If interface_address is omitted or nil then ‘0.0.0.0’ or ‘::1’ is used.

If you use IPv6 multicast then multicast_interface is used as the inbound interface. multicast_interface is a network interface index. If multicast_interface is omitted then 0 (default interface) is used.



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/rinda/ring.rb', line 132

def make_socket(address, interface_address=nil, multicast_interface=0)
  addrinfo = Addrinfo.udp(address, @port)

  socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
                      addrinfo.protocol)
  @sockets << socket

  if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
    if Socket.const_defined?(:SO_REUSEPORT) then
      socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
    else
      socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
    end

    if addrinfo.ipv4_multicast? then
      interface_address = '0.0.0.0' if interface_address.nil?
      socket.bind(Addrinfo.udp(interface_address, @port))

      mreq = IPAddr.new(addrinfo.ip_address).hton +
        IPAddr.new(interface_address).hton

      socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
    else
      interface_address = '::1' if interface_address.nil?
      socket.bind(Addrinfo.udp(interface_address, @port))

      mreq = IPAddr.new(addrinfo.ip_address).hton +
        [multicast_interface].pack('I')

      socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
    end
  else
    socket.bind(addrinfo)
  end

  socket
end

#reply_serviceObject

Creates a thread that notifies waiting clients from the TupleSpace.



202
203
204
205
206
207
208
# File 'lib/rinda/ring.rb', line 202

def reply_service
  Thread.new do
    loop do
      do_reply
    end
  end
end

#shutdownObject

Shuts down the RingServer



223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/rinda/ring.rb', line 223

def shutdown
  @renewer.renew = false

  @w_services.each do |thread|
    thread.kill
  end

  @sockets.each do |socket|
    socket.close
  end

  @r_service.kill
end

#write_servicesObject

Creates threads that pick up UDP packets and passes them to do_write for decoding.



174
175
176
177
178
179
180
181
182
183
# File 'lib/rinda/ring.rb', line 174

def write_services
  @sockets.map do |s|
    Thread.new(s) do |socket|
      loop do
        msg = socket.recv(1024)
        do_write(msg)
      end
    end
  end
end