Class: Archipelago::Disco::Jockey

Inherits:
Object
  • Object
show all
Defined in:
lib/archipelago/disco.rb

Overview

The main discovery class used to both publish and lookup services.

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Jockey

Will create a Jockey service running on :address and :port or ADDRESS and PORT if none are given.

Will the first available unicast port within :uniports or if not given UNIPORTS for receiving unicast messages.

Will have a default :lookup_timeout of LOOKUP_TIMEOUT, a default :initial_lookup_standoff of INITIAL_LOOKUP_STANDOFF and a default :validation_interval of VALIDATION_INTERVAL.

Will only cache (and validate, which saves network traffic) stuff that has been looked up before if :thrifty_caching, or THRIFTY_CACHING if not given.

Will only reply to the one that sent out the query (and therefore save lots of network traffic) if :thrifty_replying, or THRIFTY_REPLYING if not given.

Will send out a multicast when a new service is published unless :thrifty_publishing, or THRIFTY_PUBLISHING if not given.

Will reply to all queries to which it has matching local services with a unicast message if :thrifty_replying, or if not given THRIFTY_REPLYING. Otherwise will reply with multicasts.



349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/archipelago/disco.rb', line 349

def initialize(options = {})
  @valid = true
  @remote_services = ServiceLocker.new
  @local_services = ServiceLocker.new
  @subscribed_services = Set.new

  @incoming = Queue.new
  @outgoing = Queue.new

  @new_service_semaphore = MonitorMixin::ConditionVariable.new(Archipelago::Current::Lock.new)

  setup(options)

  start_listener
  start_unilistener
  start_shouter
  start_picker
  start_validator(options[:validation_interval] || VALIDATION_INTERVAL)
end

Instance Method Details

#clear!Object

Clears our local and remote services.



423
424
425
426
# File 'lib/archipelago/disco.rb', line 423

def clear!
  @local_services = ServiceLocker.new
  @remote_services = ServiceLocker.new
end

#lookup(match, timeout = @lookup_timeout) ⇒ Object

Lookup any services matching match, optionally with a timeout.

Will immediately return if we know of matching and valid services, will otherwise send out regular Queries and return as soon as matching services are found, or when the timeout runs out.



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'lib/archipelago/disco.rb', line 455

def lookup(match, timeout = @lookup_timeout)
  match[:unicast_reply] = @unicast_address
  @subscribed_services << match if @thrifty_caching
  standoff = @initial_lookup_standoff

  @outgoing << [nil, match]
  known_services = @remote_services.get_services(match).merge(@local_services.get_services(match))
  return known_services unless known_services.empty?

  @new_service_semaphore.wait(standoff)
  standoff *= 2
  
  t = Time.new
  while Time.new < t + timeout
    known_services = @remote_services.get_services(match).merge(@local_services.get_services(match))
    return known_services unless known_services.empty?

    @new_service_semaphore.wait(standoff)
    standoff *= 2

    @outgoing << [nil, match]
  end

  ServiceLocker.new
end

#publish(service) ⇒ Object

Record the given service and broadcast about it.



484
485
486
487
488
489
490
491
492
493
# File 'lib/archipelago/disco.rb', line 484

def publish(service)
  if service.valid?
    service[:published_at] = Time.now
    @local_services[service[:service_id]] = service
    @new_service_semaphore.broadcast
    unless @thrifty_publishing
      @outgoing << [nil, service]
    end
  end
end

#setup(options = {}) ⇒ Object

Sets up this instance according to the given options.



372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/archipelago/disco.rb', line 372

def setup(options = {})
  @thrifty_caching = options.include?(:thrifty_caching) ? options[:thrifty_caching] : THRIFTY_CACHING
  @thrifty_replying = options.include?(:thrifty_replying) ? options[:thrifty_replying] : THRIFTY_REPLYING
  @thrifty_publishing = options.include?(:thrifty_publishing) ? options[:thrifty_publishing] : THRIFTY_PUBLISHING
  @lookup_timeout = options[:lookup_timeout] || LOOKUP_TIMEOUT
  @initial_lookup_standoff = options[:initial_lookup_standoff] || INITIAL_LOOKUP_STANDOFF

  @listener = UDPSocket.new
  @unilistener = UDPSocket.new
  
  @listener.setsockopt(Socket::IPPROTO_IP, 
                       Socket::IP_ADD_MEMBERSHIP, 
                       IPAddr.new(options[:address] || ADDRESS).hton + Socket.gethostbyname("0.0.0.0")[3])
  
  @listener.setsockopt(Socket::SOL_SOCKET, 
                       Socket::SO_REUSEADDR, 
                       true)
  begin
    @listener.setsockopt(Socket::SOL_SOCKET, 
                         Socket::SO_REUSEPORT, 
                         true)
  rescue
    # /moo
  end
  @listener.bind('', options[:port] || PORT)

  uniports = options[:uniports] || UNIPORTS
  this_port = uniports.min
  begin
    @unilistener.bind('', this_port)
  rescue Errno::EADDRINUSE => e
    if this_port < uniports.max
      this_port += 1
      retry
    else
      raise e
    end
  end
  @unicast_address = "#{HOST}:#{this_port}"
  
  @sender = UDPSocket.new
  @multiaddress = options[:address] || ADDRESS
  @multiport = options[:port] || PORT
  @sender.connect(@multiaddress, @multiport)

  @unisender = UDPSocket.new
end

#stop!Object

Stops all the threads in this instance.



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/archipelago/disco.rb', line 431

def stop!
  if @valid
    @valid = false
    @local_services.each do |service_id, service_description|
      self.unpublish(service_id)
    end
    @listener_thread.kill
    @unilistener_thread.kill
    @validator_thread.kill
    @picker_thread.kill
    until @outgoing.empty?
      sleep(0.01)
    end
    @shouter_thread.kill
  end
end

#unpublish(service_id) ⇒ Object

Removes the service with given service_id from the published services.



498
499
500
501
502
503
504
# File 'lib/archipelago/disco.rb', line 498

def unpublish(service_id)
  @local_services.delete(service_id)
  @new_service_semaphore.broadcast
  unless @thrifty_publishing
    @outgoing << [nil, UnPublish.new(service_id)]
  end
end