Class: Net::DNS::MDNS::Responder

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/net/dns/mdns.rb

Overview

:nodoc:

Constant Summary collapse

Addr =

mDNS link-local multicast address

"224.0.0.251"
Port =
5353
UDPSize =
9000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeResponder

Returns a new instance of Responder.



427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
# File 'lib/net/dns/mdns.rb', line 427

def initialize
  @log = Logger.new(STDERR)

  @log.level = Logger::ERROR

  @mutex = Mutex.new

  @cache = Cache.new

  @queries = []

  @services = []

  @hostname = Name.create(Socket.gethostname)
  @hostname.absolute = true
  @hostaddr = Socket.getaddrinfo(@hostname.to_s, 0, Socket::AF_INET, Socket::SOCK_STREAM)[0][3]
  @hostrr   = [ @hostname, 240, IN::A.new(@hostaddr) ]
  @hostaddr = IPAddr.new(@hostaddr).hton

  debug( "start" )

  # TODO - I'm not sure about how robust this is. A better way to find the default
  # ifx would be to do:
  #   s = UDPSocket.new
  #   s.connect(any addr, any port)
  #   s.getsockname => struct sockaddr_in => ip_addr
  # But parsing a struct sockaddr_in is a PITA in ruby.

  @sock = UDPSocket.new

  # Set the close-on-exec flag, if supported.
  if Fcntl.constants.include? 'F_SETFD'
    @sock.fcntl(Fcntl::F_SETFD, 1)
  end

  # Allow 5353 to be shared.
  so_reuseport = 0x0200
  # The definition on OS X, where it is required, and where the shipped
  # ruby version (1.6) does not have Socket::SO_REUSEPORT. The definition
  # seems to be shared by at least some other BSD-derived stacks.
  if Socket.constants.include? 'SO_REUSEPORT'
    so_reuseport = Socket::SO_REUSEPORT
  end
  begin
    @sock.setsockopt(Socket::SOL_SOCKET, so_reuseport, 1)
  rescue
    warn( "set SO_REUSEPORT raised #{$!}, try SO_REUSEADDR" )
    so_reuseport = Socket::SO_REUSEADDR
    @sock.setsockopt(Socket::SOL_SOCKET, so_reuseport, 1)
  end

  # Request dest addr and ifx ids... no.

  # Bind to our port.
  @sock.bind(Socket::INADDR_ANY, Port)

  # Join the multicast group.
  #  option is a struct ip_mreq { struct in_addr, struct in_addr }
  ip_mreq =  IPAddr.new(Addr).hton + @hostaddr
  @sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, ip_mreq)
  @sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_MULTICAST_IF, @hostaddr)

  # Set IP TTL for outgoing packets.
  @sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_TTL, 255)
  @sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_MULTICAST_TTL, 255)

  # Apple source makes it appear that optval may need to be a "char" on
  # some systems:
  #  @sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_MULTICAST_TTL, 255 as int)
  #     - or -
  #  @sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_MULTICAST_TTL, 255 as byte)

  # Start responder and cacher threads.

  @waketime = nil

  @cacher_thrd = Thread.new do
    begin
      cacher_loop
    rescue
      error( "cacher_loop exited with #{$!}" )
      $!.backtrace.each do |e| error(e) end
    end
  end

  @responder_thrd = Thread.new do
    begin
      responder_loop
    rescue
      error( "responder_loop exited with #{$!}" )
      $!.backtrace.each do |e| error(e) end
    end
  end
end

Instance Attribute Details

#cacheObject (readonly)

Returns the value of attribute cache.



401
402
403
# File 'lib/net/dns/mdns.rb', line 401

def cache
  @cache
end

#hostaddrObject (readonly)

Returns the value of attribute hostaddr.



404
405
406
# File 'lib/net/dns/mdns.rb', line 404

def hostaddr
  @hostaddr
end

#hostnameObject (readonly)

Returns the value of attribute hostname.



403
404
405
# File 'lib/net/dns/mdns.rb', line 403

def hostname
  @hostname
end

#hostrrObject (readonly)

Returns the value of attribute hostrr.



405
406
407
# File 'lib/net/dns/mdns.rb', line 405

def hostrr
  @hostrr
end

#logObject

Returns the value of attribute log.



402
403
404
# File 'lib/net/dns/mdns.rb', line 402

def log
  @log
end

Instance Method Details

#cacher_loopObject



636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
# File 'lib/net/dns/mdns.rb', line 636

def cacher_loop
  delay = 0

  loop do

    if delay > 0
      sleep(delay)
    else
      sleep
    end

    @mutex.synchronize do
      debug( "sweep begin" )

      @waketime = nil

      msg = Message.new(0)
      msg.rd = 0
      msg.qr = 0
      msg.aa = 0

      now = Time.now.to_i

      # the earliest question or answer we need to wake for
      wakefor = nil

      # TODO - A delete expired, that yields every answer before
      # deleting it (so I can log it).
      # TODO - A #each_answer?
      @cache.cached.each do |name,rtypes|
        rtypes.each do |rtype, answers|
          # Delete expired answers.
          answers.delete_if do |an|
            if an.expired?
              debug( "-- a #{an}" )
              true
            end
          end
          # Requery answers that need refreshing, if there is a query that wants it.
          # Remember the earliest one we need to wake for.
          answers.each do |an|
            if an.refresh
              unless @queries.detect { |q| q.subscribes_to? an }
                debug( "no refresh of: a #{an}" )
                next
              end
              if now >= an.refresh
                an.retries += 1
                msg.add_question(name, an.data.class)
              end
              # TODO: cacher_loop exited with comparison of Bignum with nil failed, v2mdns.rb:478:in `<'
              begin
              if !wakefor || an.refresh < wakefor.refresh
                wakefor = an
              end
              rescue
                error( "an #{an.inspect}" )
                error( "wakefor #{wakefor.inspect}" )
                raise
              end
            end
          end
        end
      end

      @cache.asked.each do |name,rtypes|
        # Delete questions no query subscribes to, and that don't need refreshing.
        rtypes.delete_if do |rtype, qu|
          if !qu.refresh || !@queries.detect { |q| q.subscribes_to? qu }
            debug( "no refresh of: q #{qu}" )
            true
          end
        end
        # Requery questions that need refreshing.
        # Remember the earliest one we need to wake for.
        rtypes.each do |rtype, qu|
          if now >= qu.refresh
            msg.add_question(name, rtype)
          end
          if !wakefor || qu.refresh < wakefor.refresh
            wakefor = qu
          end
        end
      end

      msg.question.uniq!

      msg.each_question { |n,r| debug( "-> q #{n} #{DNS.rrname(r)}" ) }

      send(msg) if msg.question.first

      @waketime = wakefor.refresh if wakefor

      if @waketime
        delay = @waketime - Time.now.to_i
        delay = 1 if delay < 1

        debug( "refresh in #{delay} sec for #{wakefor}" )
      else
        delay = 0
      end

      debug( "sweep end" )
    end
  end # end loop
end

#debug(*args) ⇒ Object



417
418
419
# File 'lib/net/dns/mdns.rb', line 417

def debug(*args)
  @log.debug( *args ) if @log
end

#error(*args) ⇒ Object



423
424
425
# File 'lib/net/dns/mdns.rb', line 423

def error(*args)
  @log.error( *args ) if @log
end

#query_start(query, qu) ⇒ Object



767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
# File 'lib/net/dns/mdns.rb', line 767

def query_start(query, qu)
  @mutex.synchronize do
    begin
      debug( "start query #{query} with qu #{qu.inspect}" )

      @queries << query

      qu = @cache.add_question(qu)

      wake_cacher_for(qu)

      answers = @cache.answers_for(query.name, query.type)

      query.push( answers )
     
      # If it wasn't added, then we already are asking the question,
      # don't ask it again.
      if qu
        qmsg = Message.new(0)
        qmsg.rd = 0
        qmsg.qr = 0
        qmsg.aa = 0
        qmsg.add_question(qu.name, qu.type)
        
        send(qmsg)
      end
    rescue
      warn( "fail query #{query} - #{$!}" )
      @queries.delete(query)
      raise
    end
  end
end

#query_stop(query) ⇒ Object



801
802
803
804
805
806
# File 'lib/net/dns/mdns.rb', line 801

def query_stop(query)
  @mutex.synchronize do
    debug( "query #{query} - stop" )
    @queries.delete(query)
  end
end

#responder_loopObject



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
# File 'lib/net/dns/mdns.rb', line 522

def responder_loop
  loop do
    # from is [ AF_INET, port, name, addr ]
    reply, from = @sock.recvfrom(UDPSize)
    qaddr = from[3]
    qport = from[1]

    @mutex.synchronize do

      begin
        msg =  Message.decode(reply)

        qid  = msg.id
        qr   = msg.qr == 0 ? 'Q' : 'R'
        qcnt = msg.question.size
        acnt = msg.answer.size

        debug( "from #{qaddr}:#{qport} -> id #{qid} qr=#{qr} qcnt=#{qcnt} acnt=#{acnt}" )

        if( msg.query? )
          # Cache questions:
          # - ignore unicast queries
          # - record the question as asked
          # - TODO flush any answers we have over 1 sec old (otherwise if a machine goes down, its
          #    answers stay until there ttl, which can be very long!)
          msg.each_question do |name, type, unicast|
            next if unicast

            debug( "++ q #{name.to_s}/#{DNS.rrname(type)}" )

            @cache.cache_question(name, type)
          end

          # Answer questions for registered services:
          # - don't multicast answers to unicast questions
          # - let each service add any records that answer the question
          # - delete duplicate answers
          # - delete known answers (see MDNS:7.1)
          # - send an answer if there are any answers
          amsg = Message.new(0)
          amsg.rd = 0
          amsg.qr = 1
          amsg.aa = 1
          msg.each_question do |name, type, unicast|
            next if unicast

            debug( "ask? #{name}/#{DNS.rrname(type)}" )
            @services.each do |svc|
              svc.answer_question(name, type, amsg)
            end
          end

          amsg.question.uniq!
          amsg.answer.uniq!
          amsg.additional.uniq!

          amsg.answer.delete_if do |an|
            msg.answer.detect do |known|
              # Recall: an = [ name, ttl, data, cacheflush ]
              if(an[0] == known[0] && an[2] == known[2] && (an[1]/2) < known[1])
                true # an is a duplicate, and known is not about to expire
              else
                false
              end
            end
          end

          send(amsg, qid, qaddr, qport) if amsg.answer.first

        else
          # Cache answers:
          cached = []
          msg.each_answer do |n, ttl, data, cacheflush|

            a = Answer.new(n, ttl, data, cacheflush)
            debug( "++ a #{ a }" )
            a = @cache.cache_answer(a)
            debug( " cached" ) if a

            # If a wasn't cached, then its an answer we already have, don't push it.
            cached << a if a

            wake_cacher_for(a)
          end

          # Push answers to Queries:
          # TODO - push all answers, let the Query do what it wants with them.
          @queries.each do |q|
            answers = cached.select { |an| q.subscribes_to? an }

            debug( "push #{answers.length} to #{q}" )

            q.push( answers )
          end

        end

      rescue DecodeError
        warn( "decode error: #{reply.inspect}" )
      end

    end # end sync
  end # end loop
end

#send(msg, qid = nil, qaddr = nil, qport = nil) ⇒ Object



743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
# File 'lib/net/dns/mdns.rb', line 743

def send(msg, qid = nil, qaddr = nil, qport = nil)
  begin
    msg.answer.each do |an|
      debug( "-> an #{an[0]} (#{an[1]}) #{an[2].to_s} #{an[3].inspect}" )
    end
    msg.additional.each do |an|
      debug( "-> ad #{an[0]} (#{an[1]}) #{an[2].to_s} #{an[3].inspect}" )
    end
    # Unicast response directly to questioner if source port is not 5353.
    if qport && qport != Port
      debug( "unicast for qid #{qid} to #{qaddr}:#{qport}" )
      msg.id = qid
      @sock.send(msg.encode, 0, qaddr, qport)
    end
    # ID is always zero for mcast, don't repeat questions for mcast
    msg.id = 0
    msg.question.clear unless msg.query?
    @sock.send(msg.encode, 0, Addr, Port)
  rescue
    error( "send msg failed: #{$!}" )
    raise
  end
end

#service_start(service, announce_answers = []) ⇒ Object



808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
# File 'lib/net/dns/mdns.rb', line 808

def service_start(service, announce_answers = [])
  @mutex.synchronize do
    begin
      @services << service

      debug( "start service #{service.to_s}" )

      if announce_answers.first
        smsg = Message.new(0)
        smsg.rd = 0
        smsg.qr = 1
        smsg.aa = 1
        announce_answers.each do |a|
          smsg.add_answer(*a)
        end
        send(smsg)
      end

    rescue
      warn( "fail service #{service} - #{$!}" )
      @queries.delete(service)
      raise
    end
  end
end

#service_stop(service) ⇒ Object



834
835
836
837
838
839
# File 'lib/net/dns/mdns.rb', line 834

def service_stop(service)
  @mutex.synchronize do
    debug( "service #{service} - stop" )
    @services.delete(service)
  end
end

#wake_cacher_for(item) ⇒ Object

wake sweeper if cache item needs refreshing before current waketime



628
629
630
631
632
633
634
# File 'lib/net/dns/mdns.rb', line 628

def wake_cacher_for(item)
  return unless item

  if !@waketime || @waketime == 0 || item.refresh < @waketime
    @cacher_thrd.wakeup
  end
end

#warn(*args) ⇒ Object



420
421
422
# File 'lib/net/dns/mdns.rb', line 420

def warn(*args)
  @log.warn( *args ) if @log
end