Class: QuartzTorrent::PeerClientHandler

Inherits:
Handler
  • Object
show all
Defined in:
lib/quartz_torrent/peerclient.rb

Overview

This class implements a Reactor Handler object. This Handler implements the PeerClient.

Instance Attribute Summary collapse

Attributes inherited from Handler

#reactor

Instance Method Summary collapse

Methods inherited from Handler

#cancelTimer, #close, #connect, #connectError, #currentIo, #findIoByMetainfo, #read, #scheduleTimer, #setMetaInfo, #setReadRateLimit, #setWriteRateLimit, #stopReactor, #stopped?, #userEvent, #write

Constructor Details

#initialize(baseDirectory, maxIncomplete = 5, maxActive = 10) ⇒ PeerClientHandler

Returns a new instance of PeerClientHandler.



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/quartz_torrent/peerclient.rb', line 258

def initialize(baseDirectory, maxIncomplete = 5, maxActive = 10)
  # Hash of TorrentData objects, keyed by torrent infoHash
  @torrentData = {}
  @torrentQueue = TorrentQueue.new(maxIncomplete, maxActive)

  @baseDirectory = baseDirectory

  @logger = LogManager.getLogger("peerclient")

  # Overall maximum number of peers (connected + disconnected)
  @maxPeerCount = 120
  # Number of peers we ideally want to try and be downloading/uploading with
  @targetActivePeerCount = 50
  @targetUnchokedPeerCount = 4
  @managePeersPeriod = 10 # Defined in bittorrent spec. Only unchoke peers every 10 seconds.
  @requestBlocksPeriod = 1
  @handshakeTimeout = 1
  @requestTimeout = 60
  @endgameBlockThreshold = 20
end

Instance Attribute Details

#torrentDataObject (readonly)

PUBLIC API METHODS ################################################



281
282
283
# File 'lib/quartz_torrent/peerclient.rb', line 281

def torrentData
  @torrentData
end

Instance Method Details

#addTrackerClient(infoHash, info, trackerclient) ⇒ Object

Add a new tracker client. This effectively adds a new torrent to download. Returns the TorrentData object for the new torrent.



285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/quartz_torrent/peerclient.rb', line 285

def addTrackerClient(infoHash, info, trackerclient)
  raise "There is already a tracker registered for torrent #{QuartzTorrent.bytesToHex(infoHash)}" if @torrentData.has_key? infoHash
  torrentData = TorrentData.new(infoHash, info, trackerclient)
  trackerclient.alarms = torrentData.alarms
  @torrentData[infoHash] = torrentData
  torrentData.info = info
  torrentData.state = :initializing

  queue(torrentData)
  dequeue      

  torrentData
end

#adjustBytesDownloaded(infoHash, adjustment) ⇒ Object

Adjust the bytesDownloaded property of the specified torrent by the passed amount. Adjustment should be an integer. It is added to the current bytesDownloaded amount.



404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/quartz_torrent/peerclient.rb', line 404

def adjustBytesDownloaded(infoHash, adjustment)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to adjust uploaded bytes for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end
  
  runInReactorThread do
    torrentData.bytesDownloaded += adjustment
    torrentData.bytesDownloadedDataOnly += adjustment
  end
end

#adjustBytesUploaded(infoHash, adjustment) ⇒ Object

Adjust the bytesUploaded property of the specified torrent by the passed amount. Adjustment should be an integer. It is added to the current bytesUploaded amount.



389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/quartz_torrent/peerclient.rb', line 389

def adjustBytesUploaded(infoHash, adjustment)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to adjust uploaded bytes for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end
  
  runInReactorThread do
    torrentData.bytesUploaded += adjustment
    torrentData.bytesUploadedDataOnly += adjustment
  end
end

#clientInit(peer) ⇒ Object

Reactor method called when we have connected to a peer.



553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
# File 'lib/quartz_torrent/peerclient.rb', line 553

def clientInit(peer)
  # We connected to a peer
  # Send handshake
  torrentData = @torrentData[peer.infoHash]
  if ! torrentData
    @logger.warn "No tracker client found for peer #{peer}. Closing connection."
    close
    return
  end
  trackerclient = torrentData.trackerClient

  @logger.info "Connected to peer #{peer}. Sending handshake."
  msg = PeerHandshake.new
  msg.peerId = trackerclient.peerId
  msg.infoHash = peer.infoHash
  msg.serializeTo currentIo
  peer.state = :handshaking
  @reactor.scheduleTimer(@handshakeTimeout, [:handshake_timeout, peer], false)
  @logger.debug "Done sending handshake."

  # Send bitfield
  sendBitfield(currentIo, torrentData.blockState.completePieceBitfield) if torrentData.blockState

  setReadRateLimit(torrentData.downRateLimit) if torrentData.downRateLimit
  setWriteRateLimit(torrentData.upRateLimit) if torrentData.upRateLimit
end

#error(peer, details) ⇒ Object

Reactor method called when an IO error occurs.



714
715
716
717
718
719
720
721
722
723
724
725
# File 'lib/quartz_torrent/peerclient.rb', line 714

def error(peer, details)
  # If a peer closes the connection during handshake before we determine their id, we don't have a completed
  # Peer object yet. In this case the peer parameter is the symbol :listener_socket
  if peer == :listener_socket
    @logger.info "Error with handshaking peer: #{details}. Closing connection."
  else
    @logger.info "Error with peer #{peer}: #{details}. Closing connection."
    setPeerDisconnected(peer)
  end
  # Close connection
  close
end

#getDelegateTorrentData(infoHash = nil) ⇒ Object

Get a hash of new TorrentDataDelegate objects keyed by torrent infohash. This method is meant to be called from a different thread than the one the reactor is running in. This method is not immediate but blocks until the data is prepared. If infoHash is passed, only that torrent data is returned (still in a hashtable; just one entry)



422
423
424
425
426
427
428
429
430
431
432
433
434
# File 'lib/quartz_torrent/peerclient.rb', line 422

def getDelegateTorrentData(infoHash = nil)
  # Use an immediate, non-recurring timer.
  result = {}
  return result if stopped?
  semaphore = Semaphore.new
  timer = @reactor.scheduleTimer(0, [:get_torrent_data, result, semaphore, infoHash], false, true)
  if semaphore.wait(3)
    result
  else
    @logger.warn "getDelegateTorrentData: Waiting on semaphore timed out"
    throw "Waiting on semaphore for timer #{timer.object_id} timed out"
  end
end

#recvData(peer) ⇒ Object

Reactor method called when there is data ready to be read from a socket



581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
# File 'lib/quartz_torrent/peerclient.rb', line 581

def recvData(peer)
  msg = nil

  @logger.debug "Got data from peer #{peer}"

  if peer.state == :handshaking
    # Read handshake message
    begin
      @logger.debug "Reading handshake from #{peer}"
      msg = PeerHandshake.unserializeFrom currentIo
    rescue
      @logger.warn "Peer #{peer} failed handshake: #{$!}"
      setPeerDisconnected(peer)
      close
      return
    end
  else
    begin
      @logger.debug "Reading wire-message from #{peer}"
      msg = peer.peerMsgSerializer.unserializeFrom currentIo
      #msg = PeerWireMessage.unserializeFrom currentIo
    rescue EOFError
      @logger.info "Peer #{peer} disconnected."
      setPeerDisconnected(peer)
      close
      return
    rescue
      @logger.warn "Unserializing message from peer #{peer} failed: #{$!}"
      @logger.warn $!.backtrace.join "\n"
      setPeerDisconnected(peer)
      close
      return
    end

    peer.updateUploadRate msg
    torrentData = @torrentData[peer.infoHash]
    torrentData.bytesDownloaded += msg.length if torrentData
    @logger.debug "Peer #{peer} upload rate: #{peer.uploadRate.value}  data only: #{peer.uploadRateDataOnly.value}"
  end


  if msg.is_a? PeerHandshake
    # This is a remote peer that we connected to returning our handshake.
    processHandshake(msg, peer)
    peer.state = :established
    peer.amChoked = true
    peer.peerChoked = true
    peer.amInterested = false
    peer.peerInterested = false
  elsif msg.is_a? BitfieldMessage
    @logger.debug "Received bitfield message from peer."
    handleBitfield(msg, peer)
  elsif msg.is_a? Unchoke
    @logger.debug "Received unchoke message from peer."
    peer.amChoked = false
  elsif msg.is_a? Choke
    @logger.debug "Received choke message from peer."
    peer.amChoked = true
  elsif msg.is_a? Interested
    @logger.debug "Received interested message from peer."
    peer.peerInterested = true
  elsif msg.is_a? Uninterested
    @logger.debug "Received uninterested message from peer."
    peer.peerInterested = false
  elsif msg.is_a? Piece
    @logger.debug "Received piece message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.data.length}."
    handlePieceReceive(msg, peer)
  elsif msg.is_a? Request
    @logger.debug "Received request message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.blockLength}."
    handleRequest(msg, peer)
  elsif msg.is_a? Have
    @logger.debug "Received have message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex}"
    handleHave(msg, peer)
  elsif msg.is_a? KeepAlive
    @logger.debug "Received keep alive message from peer."
  elsif msg.is_a? ExtendedHandshake
    @logger.debug "Received extended handshake message from peer."
    handleExtendedHandshake(msg, peer)
  elsif msg.is_a? ExtendedMetaInfo
    @logger.debug "Received extended metainfo message from peer."
    handleExtendedMetainfo(msg, peer)
  else
    @logger.warn "Received a #{msg.class} message but handler is not implemented"
  end
end

#removeTorrent(infoHash, deleteFiles = false) ⇒ Object

Remove a torrent.



300
301
302
303
304
# File 'lib/quartz_torrent/peerclient.rb', line 300

def removeTorrent(infoHash, deleteFiles = false)
  # Can't do this right now, since it could be in use by an event handler. Use an immediate, non-recurring timer instead.
  @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Scheduling immediate timer to remove torrent. #{deleteFiles ? "Will" : "Wont"} delete downloaded files."
  @reactor.scheduleTimer(0, [:removetorrent, infoHash, deleteFiles], false, true)
end

#serverInit(metadata, addr, port) ⇒ Object

Reactor method called when a peer has connected to us.



450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
# File 'lib/quartz_torrent/peerclient.rb', line 450

def serverInit(, addr, port)
  # A peer connected to us
  # Read handshake message
  @logger.warn "Peer connection from #{addr}:#{port}"
  begin
    msg = PeerHandshake.unserializeExceptPeerIdFrom currentIo
  rescue
    @logger.warn "Peer failed handshake: #{$!}"
    close
    return
  end

  torrentData = torrentDataForHandshake(msg, "#{addr}:#{port}")
  # Are we tracking this torrent?
  if !torrentData
    @logger.warn "Peer sent handshake for unknown torrent"
    close
    return 
  end
  trackerclient = torrentData.trackerClient

  # If we already have too many connections, don't allow this connection.
  classifiedPeers = ClassifiedPeers.new torrentData.peers.all
  if classifiedPeers.establishedPeers.length > @targetActivePeerCount
    @logger.warn "Closing connection to peer from #{addr}:#{port} because we already have #{classifiedPeers.establishedPeers.length} active peers which is > the target count of #{@targetActivePeerCount} "
    close
    return 
  end  

  # Send handshake
  outgoing = PeerHandshake.new
  outgoing.peerId = trackerclient.peerId
  outgoing.infoHash = torrentData.infoHash
  outgoing.serializeTo currentIo

  # Send extended handshake if the peer supports extensions
  if (msg.reserved.unpack("C8")[5] & 0x10) != 0
    @logger.warn "Peer supports extensions. Sending extended handshake"
    extended = Extension.createExtendedHandshake torrentData.info
    extended.serializeTo currentIo
  end
 
  # Read incoming handshake's peerid
  msg.peerId = currentIo.read(PeerHandshake::PeerIdLen)

  if msg.peerId == trackerclient.peerId
    @logger.info "We got a connection from ourself. Closing connection."
    close
    return
  end
 
  peer = nil
  peers = torrentData.peers.findById(msg.peerId)
  if peers
    peers.each do |existingPeer|
      if existingPeer.state != :disconnected
        @logger.warn "Peer with id #{msg.peerId} created a new connection when we already have a connection in state #{existingPeer.state}. Closing new connection."
        close
        return
      else
        if existingPeer.trackerPeer.ip == addr && existingPeer.trackerPeer.port == port
          peer = existingPeer
        end
      end
    end
  end

  if ! peer
    peer = Peer.new(TrackerPeer.new(addr, port))
    updatePeerWithHandshakeInfo(torrentData, msg, peer)
    torrentData.peers.add peer
    if ! peers
      @logger.warn "Unknown peer with id #{msg.peerId} connected."
    else
      @logger.warn "Known peer with id #{msg.peerId} connected from new location."
    end
  else
    @logger.warn "Known peer with id #{msg.peerId} connected from known location."
  end

  @logger.info "Peer #{peer} connected to us. "

  peer.state = :established
  peer.amChoked = true
  peer.peerChoked = true
  peer.amInterested = false
  peer.peerInterested = false
  if torrentData.info
    peer.bitfield = Bitfield.new(torrentData.info.pieces.length)
  else
    peer.bitfield = EmptyBitfield.new
    @logger.info "We have no metainfo yet, so setting peer #{peer} to have an EmptyBitfield"
  end

  # Send bitfield
  sendBitfield(currentIo, torrentData.blockState.completePieceBitfield) if torrentData.blockState

  setMetaInfo(peer)
  setReadRateLimit(torrentData.downRateLimit) if torrentData.downRateLimit
  setWriteRateLimit(torrentData.upRateLimit) if torrentData.upRateLimit
end

#setDownloadRateLimit(infoHash, bytesPerSecond) ⇒ Object

Set the download rate limit. Pass nil as the bytesPerSecond to disable the limit.



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/quartz_torrent/peerclient.rb', line 314

def setDownloadRateLimit(infoHash, bytesPerSecond)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to set download rate limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end
 
  if bytesPerSecond
    if ! torrentData.downRateLimit
      torrentData.downRateLimit = RateLimit.new(bytesPerSecond, 2*bytesPerSecond, 0)
    else
      torrentData.downRateLimit.unitsPerSecond = bytesPerSecond
    end
  else
    torrentData.downRateLimit = nil
  end

  torrentData.peers.all.each do |peer|
    withPeersIo(peer, "setting download rate limit") do |io|
      io.readRateLimit = torrentData.downRateLimit
    end
  end
 
end

#setPaused(infoHash, value) ⇒ Object

Pause or unpause the specified torrent.



307
308
309
310
311
# File 'lib/quartz_torrent/peerclient.rb', line 307

def setPaused(infoHash, value)
  # Can't do this right now, since it could be in use by an event handler. Use an immediate, non-recurring timer instead.
  @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Scheduling immediate timer to #{value ? "pause" : "unpause"} torrent."
  @reactor.scheduleTimer(0, [:pausetorrent, infoHash, value], false, true)
end

#setUploadDuration(infoHash, seconds) ⇒ Object

Set the maximum amount of time (in seconds) that a torrent can be in the upload-only state before it is paused. Pass nil to disable.



377
378
379
380
381
382
383
384
385
# File 'lib/quartz_torrent/peerclient.rb', line 377

def setUploadDuration(infoHash, seconds)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to set upload duration for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end

  torrentData.uploadDuration = seconds
end

#setUploadRateLimit(infoHash, bytesPerSecond) ⇒ Object

Set the upload rate limit. Pass nil as the bytesPerSecond to disable the limit.



340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/quartz_torrent/peerclient.rb', line 340

def setUploadRateLimit(infoHash, bytesPerSecond)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to set upload rate limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end

  if bytesPerSecond
    if ! torrentData.upRateLimit
      torrentData.upRateLimit = RateLimit.new(bytesPerSecond, 2*bytesPerSecond, 0)
    else
      torrentData.upRateLimit.unitsPerSecond = bytesPerSecond
    end
  else
    torrentData.upRateLimit = nil
  end

  torrentData.peers.all.each do |peer|
    withPeersIo(peer, "setting upload rate limit") do |io|
      io.writeRateLimit = torrentData.upRateLimit
    end
  end
end

#setUploadRatio(infoHash, ratio) ⇒ Object

Set the upload ratio. Pass nil to disable



365
366
367
368
369
370
371
372
373
# File 'lib/quartz_torrent/peerclient.rb', line 365

def setUploadRatio(infoHash, ratio)
  torrentData = @torrentData[infoHash]
  if ! torrentData
    @logger.warn "Asked to set upload ratio limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}"
    return
  end

  torrentData.ratio = ratio
end

#timerExpired(metadata) ⇒ Object

Reactor method called when a scheduled timer expires.



668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
# File 'lib/quartz_torrent/peerclient.rb', line 668

def timerExpired()
  if .is_a?(Array) && [0] == :manage_peers
    managePeers([1])
  elsif .is_a?(Array) && [0] == :request_blocks
    requestBlocks([1])
  elsif .is_a?(Array) && [0] == :check_piece_manager
    checkPieceManagerResults([1])
  elsif .is_a?(Array) && [0] == :handshake_timeout
    handleHandshakeTimeout([1])
  elsif .is_a?(Array) && [0] == :removetorrent
    handleRemoveTorrent([1], [2])
  elsif .is_a?(Array) && [0] == :pausetorrent
    handlePause([1], [2])
  elsif .is_a?(Array) && [0] == :get_torrent_data
    @torrentData.each do |k,v|
      begin
        if [3].nil? || k == [3]
          v = TorrentDataDelegate.new(v, self)
          [1][k] = v
        end
      rescue
        @logger.error "Error building torrent data response for user: #{$!}"
        @logger.error "#{$!.backtrace.join("\n")}"
      end
    end
    [2].signal
  elsif .is_a?(Array) && [0] == :update_torrent_data
    delegate = [1]
    if ! @torrentData.has_key?(infoHash)
      delegate.state = :deleted 
    else
      delegate.internalRefresh
    end
    [2].signal
  elsif .is_a?(Array) && [0] == :request_metadata_pieces
    requestMetadataPieces([1])
  elsif .is_a?(Array) && [0] == :check_metadata_piece_manager
    checkMetadataPieceManagerResults([1])
  elsif .is_a?(Array) && [0] == :runproc
    [1].call
  else
    @logger.info "Unknown timer #{} expired."
  end
end

#updateDelegateTorrentData(delegate) ⇒ Object

Update the data stored in a TorrentDataDelegate to the latest information.



437
438
439
440
441
442
443
444
# File 'lib/quartz_torrent/peerclient.rb', line 437

def updateDelegateTorrentData(delegate)
  return if stopped?
  # Use an immediate, non-recurring timer.
  semaphore = Semaphore.new
  @reactor.scheduleTimer(0, [:update_torrent_data, delegate, semaphore], false, true)
  semaphore.wait
  result
end