Class: QuartzTorrent::PeerClientHandler
- Defined in:
- lib/quartz_torrent/peerclient.rb
Overview
This class implements a Reactor Handler object. This Handler implements the PeerClient.
Instance Attribute Summary collapse
-
#torrentData ⇒ Object
readonly
PUBLIC API METHODS ################################################.
Attributes inherited from Handler
Instance Method Summary collapse
-
#addTrackerClient(infoHash, info, trackerclient) ⇒ Object
Add a new tracker client.
-
#adjustBytesDownloaded(infoHash, adjustment) ⇒ Object
Adjust the bytesDownloaded property of the specified torrent by the passed amount.
-
#adjustBytesUploaded(infoHash, adjustment) ⇒ Object
Adjust the bytesUploaded property of the specified torrent by the passed amount.
-
#clientInit(peer) ⇒ Object
Reactor method called when we have connected to a peer.
-
#error(peer, details) ⇒ Object
Reactor method called when an IO error occurs.
-
#getDelegateTorrentData(infoHash = nil) ⇒ Object
Get a hash of new TorrentDataDelegate objects keyed by torrent infohash.
-
#initialize(baseDirectory, maxIncomplete = 5, maxActive = 10) ⇒ PeerClientHandler
constructor
A new instance of PeerClientHandler.
-
#recvData(peer) ⇒ Object
Reactor method called when there is data ready to be read from a socket.
-
#removeTorrent(infoHash, deleteFiles = false) ⇒ Object
Remove a torrent.
-
#serverInit(metadata, addr, port) ⇒ Object
Reactor method called when a peer has connected to us.
-
#setDownloadRateLimit(infoHash, bytesPerSecond) ⇒ Object
Set the download rate limit.
-
#setPaused(infoHash, value) ⇒ Object
Pause or unpause the specified torrent.
-
#setUploadDuration(infoHash, seconds) ⇒ Object
Set the maximum amount of time (in seconds) that a torrent can be in the upload-only state before it is paused.
-
#setUploadRateLimit(infoHash, bytesPerSecond) ⇒ Object
Set the upload rate limit.
-
#setUploadRatio(infoHash, ratio) ⇒ Object
Set the upload ratio.
-
#timerExpired(metadata) ⇒ Object
Reactor method called when a scheduled timer expires.
-
#updateDelegateTorrentData(delegate) ⇒ Object
Update the data stored in a TorrentDataDelegate to the latest information.
Methods inherited from Handler
#cancelTimer, #close, #connect, #connectError, #currentIo, #findIoByMetainfo, #read, #scheduleTimer, #setMetaInfo, #setReadRateLimit, #setWriteRateLimit, #stopReactor, #stopped?, #userEvent, #write
Constructor Details
#initialize(baseDirectory, maxIncomplete = 5, maxActive = 10) ⇒ PeerClientHandler
Returns a new instance of PeerClientHandler.
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/quartz_torrent/peerclient.rb', line 258 def initialize(baseDirectory, maxIncomplete = 5, maxActive = 10) # Hash of TorrentData objects, keyed by torrent infoHash @torrentData = {} @torrentQueue = TorrentQueue.new(maxIncomplete, maxActive) @baseDirectory = baseDirectory @logger = LogManager.getLogger("peerclient") # Overall maximum number of peers (connected + disconnected) @maxPeerCount = 120 # Number of peers we ideally want to try and be downloading/uploading with @targetActivePeerCount = 50 @targetUnchokedPeerCount = 4 @managePeersPeriod = 10 # Defined in bittorrent spec. Only unchoke peers every 10 seconds. @requestBlocksPeriod = 1 @handshakeTimeout = 1 @requestTimeout = 60 @endgameBlockThreshold = 20 end |
Instance Attribute Details
#torrentData ⇒ Object (readonly)
PUBLIC API METHODS ################################################
281 282 283 |
# File 'lib/quartz_torrent/peerclient.rb', line 281 def torrentData @torrentData end |
Instance Method Details
#addTrackerClient(infoHash, info, trackerclient) ⇒ Object
Add a new tracker client. This effectively adds a new torrent to download. Returns the TorrentData object for the new torrent.
285 286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/quartz_torrent/peerclient.rb', line 285 def addTrackerClient(infoHash, info, trackerclient) raise "There is already a tracker registered for torrent #{QuartzTorrent.bytesToHex(infoHash)}" if @torrentData.has_key? infoHash torrentData = TorrentData.new(infoHash, info, trackerclient) trackerclient.alarms = torrentData.alarms @torrentData[infoHash] = torrentData torrentData.info = info torrentData.state = :initializing queue(torrentData) dequeue torrentData end |
#adjustBytesDownloaded(infoHash, adjustment) ⇒ Object
Adjust the bytesDownloaded property of the specified torrent by the passed amount. Adjustment should be an integer. It is added to the current bytesDownloaded amount.
404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'lib/quartz_torrent/peerclient.rb', line 404 def adjustBytesDownloaded(infoHash, adjustment) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to adjust uploaded bytes for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end runInReactorThread do torrentData.bytesDownloaded += adjustment torrentData.bytesDownloadedDataOnly += adjustment end end |
#adjustBytesUploaded(infoHash, adjustment) ⇒ Object
Adjust the bytesUploaded property of the specified torrent by the passed amount. Adjustment should be an integer. It is added to the current bytesUploaded amount.
389 390 391 392 393 394 395 396 397 398 399 400 |
# File 'lib/quartz_torrent/peerclient.rb', line 389 def adjustBytesUploaded(infoHash, adjustment) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to adjust uploaded bytes for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end runInReactorThread do torrentData.bytesUploaded += adjustment torrentData.bytesUploadedDataOnly += adjustment end end |
#clientInit(peer) ⇒ Object
Reactor method called when we have connected to a peer.
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 |
# File 'lib/quartz_torrent/peerclient.rb', line 553 def clientInit(peer) # We connected to a peer # Send handshake torrentData = @torrentData[peer.infoHash] if ! torrentData @logger.warn "No tracker client found for peer #{peer}. Closing connection." close return end trackerclient = torrentData.trackerClient @logger.info "Connected to peer #{peer}. Sending handshake." msg = PeerHandshake.new msg.peerId = trackerclient.peerId msg.infoHash = peer.infoHash msg.serializeTo currentIo peer.state = :handshaking @reactor.scheduleTimer(@handshakeTimeout, [:handshake_timeout, peer], false) @logger.debug "Done sending handshake." # Send bitfield sendBitfield(currentIo, torrentData.blockState.completePieceBitfield) if torrentData.blockState setReadRateLimit(torrentData.downRateLimit) if torrentData.downRateLimit setWriteRateLimit(torrentData.upRateLimit) if torrentData.upRateLimit end |
#error(peer, details) ⇒ Object
Reactor method called when an IO error occurs.
714 715 716 717 718 719 720 721 722 723 724 725 |
# File 'lib/quartz_torrent/peerclient.rb', line 714 def error(peer, details) # If a peer closes the connection during handshake before we determine their id, we don't have a completed # Peer object yet. In this case the peer parameter is the symbol :listener_socket if peer == :listener_socket @logger.info "Error with handshaking peer: #{details}. Closing connection." else @logger.info "Error with peer #{peer}: #{details}. Closing connection." setPeerDisconnected(peer) end # Close connection close end |
#getDelegateTorrentData(infoHash = nil) ⇒ Object
Get a hash of new TorrentDataDelegate objects keyed by torrent infohash. This method is meant to be called from a different thread than the one the reactor is running in. This method is not immediate but blocks until the data is prepared. If infoHash is passed, only that torrent data is returned (still in a hashtable; just one entry)
422 423 424 425 426 427 428 429 430 431 432 433 434 |
# File 'lib/quartz_torrent/peerclient.rb', line 422 def getDelegateTorrentData(infoHash = nil) # Use an immediate, non-recurring timer. result = {} return result if stopped? semaphore = Semaphore.new timer = @reactor.scheduleTimer(0, [:get_torrent_data, result, semaphore, infoHash], false, true) if semaphore.wait(3) result else @logger.warn "getDelegateTorrentData: Waiting on semaphore timed out" throw "Waiting on semaphore for timer #{timer.object_id} timed out" end end |
#recvData(peer) ⇒ Object
Reactor method called when there is data ready to be read from a socket
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 |
# File 'lib/quartz_torrent/peerclient.rb', line 581 def recvData(peer) msg = nil @logger.debug "Got data from peer #{peer}" if peer.state == :handshaking # Read handshake message begin @logger.debug "Reading handshake from #{peer}" msg = PeerHandshake.unserializeFrom currentIo rescue @logger.warn "Peer #{peer} failed handshake: #{$!}" setPeerDisconnected(peer) close return end else begin @logger.debug "Reading wire-message from #{peer}" msg = peer.peerMsgSerializer.unserializeFrom currentIo #msg = PeerWireMessage.unserializeFrom currentIo rescue EOFError @logger.info "Peer #{peer} disconnected." setPeerDisconnected(peer) close return rescue @logger.warn "Unserializing message from peer #{peer} failed: #{$!}" @logger.warn $!.backtrace.join "\n" setPeerDisconnected(peer) close return end peer.updateUploadRate msg torrentData = @torrentData[peer.infoHash] torrentData.bytesDownloaded += msg.length if torrentData @logger.debug "Peer #{peer} upload rate: #{peer.uploadRate.value} data only: #{peer.uploadRateDataOnly.value}" end if msg.is_a? PeerHandshake # This is a remote peer that we connected to returning our handshake. processHandshake(msg, peer) peer.state = :established peer.amChoked = true peer.peerChoked = true peer.amInterested = false peer.peerInterested = false elsif msg.is_a? BitfieldMessage @logger.debug "Received bitfield message from peer." handleBitfield(msg, peer) elsif msg.is_a? Unchoke @logger.debug "Received unchoke message from peer." peer.amChoked = false elsif msg.is_a? Choke @logger.debug "Received choke message from peer." peer.amChoked = true elsif msg.is_a? Interested @logger.debug "Received interested message from peer." peer.peerInterested = true elsif msg.is_a? Uninterested @logger.debug "Received uninterested message from peer." peer.peerInterested = false elsif msg.is_a? Piece @logger.debug "Received piece message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.data.length}." handlePieceReceive(msg, peer) elsif msg.is_a? Request @logger.debug "Received request message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex} offset #{msg.blockOffset} length #{msg.blockLength}." handleRequest(msg, peer) elsif msg.is_a? Have @logger.debug "Received have message from peer for torrent #{QuartzTorrent.bytesToHex(peer.infoHash)}: piece #{msg.pieceIndex}" handleHave(msg, peer) elsif msg.is_a? KeepAlive @logger.debug "Received keep alive message from peer." elsif msg.is_a? ExtendedHandshake @logger.debug "Received extended handshake message from peer." handleExtendedHandshake(msg, peer) elsif msg.is_a? ExtendedMetaInfo @logger.debug "Received extended metainfo message from peer." handleExtendedMetainfo(msg, peer) else @logger.warn "Received a #{msg.class} message but handler is not implemented" end end |
#removeTorrent(infoHash, deleteFiles = false) ⇒ Object
Remove a torrent.
300 301 302 303 304 |
# File 'lib/quartz_torrent/peerclient.rb', line 300 def removeTorrent(infoHash, deleteFiles = false) # Can't do this right now, since it could be in use by an event handler. Use an immediate, non-recurring timer instead. @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Scheduling immediate timer to remove torrent. #{deleteFiles ? "Will" : "Wont"} delete downloaded files." @reactor.scheduleTimer(0, [:removetorrent, infoHash, deleteFiles], false, true) end |
#serverInit(metadata, addr, port) ⇒ Object
Reactor method called when a peer has connected to us.
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 |
# File 'lib/quartz_torrent/peerclient.rb', line 450 def serverInit(, addr, port) # A peer connected to us # Read handshake message @logger.warn "Peer connection from #{addr}:#{port}" begin msg = PeerHandshake.unserializeExceptPeerIdFrom currentIo rescue @logger.warn "Peer failed handshake: #{$!}" close return end torrentData = torrentDataForHandshake(msg, "#{addr}:#{port}") # Are we tracking this torrent? if !torrentData @logger.warn "Peer sent handshake for unknown torrent" close return end trackerclient = torrentData.trackerClient # If we already have too many connections, don't allow this connection. classifiedPeers = ClassifiedPeers.new torrentData.peers.all if classifiedPeers.establishedPeers.length > @targetActivePeerCount @logger.warn "Closing connection to peer from #{addr}:#{port} because we already have #{classifiedPeers.establishedPeers.length} active peers which is > the target count of #{@targetActivePeerCount} " close return end # Send handshake outgoing = PeerHandshake.new outgoing.peerId = trackerclient.peerId outgoing.infoHash = torrentData.infoHash outgoing.serializeTo currentIo # Send extended handshake if the peer supports extensions if (msg.reserved.unpack("C8")[5] & 0x10) != 0 @logger.warn "Peer supports extensions. Sending extended handshake" extended = Extension.createExtendedHandshake torrentData.info extended.serializeTo currentIo end # Read incoming handshake's peerid msg.peerId = currentIo.read(PeerHandshake::PeerIdLen) if msg.peerId == trackerclient.peerId @logger.info "We got a connection from ourself. Closing connection." close return end peer = nil peers = torrentData.peers.findById(msg.peerId) if peers peers.each do |existingPeer| if existingPeer.state != :disconnected @logger.warn "Peer with id #{msg.peerId} created a new connection when we already have a connection in state #{existingPeer.state}. Closing new connection." close return else if existingPeer.trackerPeer.ip == addr && existingPeer.trackerPeer.port == port peer = existingPeer end end end end if ! peer peer = Peer.new(TrackerPeer.new(addr, port)) updatePeerWithHandshakeInfo(torrentData, msg, peer) torrentData.peers.add peer if ! peers @logger.warn "Unknown peer with id #{msg.peerId} connected." else @logger.warn "Known peer with id #{msg.peerId} connected from new location." end else @logger.warn "Known peer with id #{msg.peerId} connected from known location." end @logger.info "Peer #{peer} connected to us. " peer.state = :established peer.amChoked = true peer.peerChoked = true peer.amInterested = false peer.peerInterested = false if torrentData.info peer.bitfield = Bitfield.new(torrentData.info.pieces.length) else peer.bitfield = EmptyBitfield.new @logger.info "We have no metainfo yet, so setting peer #{peer} to have an EmptyBitfield" end # Send bitfield sendBitfield(currentIo, torrentData.blockState.completePieceBitfield) if torrentData.blockState setMetaInfo(peer) setReadRateLimit(torrentData.downRateLimit) if torrentData.downRateLimit setWriteRateLimit(torrentData.upRateLimit) if torrentData.upRateLimit end |
#setDownloadRateLimit(infoHash, bytesPerSecond) ⇒ Object
Set the download rate limit. Pass nil as the bytesPerSecond to disable the limit.
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/quartz_torrent/peerclient.rb', line 314 def setDownloadRateLimit(infoHash, bytesPerSecond) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to set download rate limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end if bytesPerSecond if ! torrentData.downRateLimit torrentData.downRateLimit = RateLimit.new(bytesPerSecond, 2*bytesPerSecond, 0) else torrentData.downRateLimit.unitsPerSecond = bytesPerSecond end else torrentData.downRateLimit = nil end torrentData.peers.all.each do |peer| withPeersIo(peer, "setting download rate limit") do |io| io.readRateLimit = torrentData.downRateLimit end end end |
#setPaused(infoHash, value) ⇒ Object
Pause or unpause the specified torrent.
307 308 309 310 311 |
# File 'lib/quartz_torrent/peerclient.rb', line 307 def setPaused(infoHash, value) # Can't do this right now, since it could be in use by an event handler. Use an immediate, non-recurring timer instead. @logger.info "#{QuartzTorrent.bytesToHex(infoHash)}: Scheduling immediate timer to #{value ? "pause" : "unpause"} torrent." @reactor.scheduleTimer(0, [:pausetorrent, infoHash, value], false, true) end |
#setUploadDuration(infoHash, seconds) ⇒ Object
Set the maximum amount of time (in seconds) that a torrent can be in the upload-only state before it is paused. Pass nil to disable.
377 378 379 380 381 382 383 384 385 |
# File 'lib/quartz_torrent/peerclient.rb', line 377 def setUploadDuration(infoHash, seconds) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to set upload duration for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end torrentData.uploadDuration = seconds end |
#setUploadRateLimit(infoHash, bytesPerSecond) ⇒ Object
Set the upload rate limit. Pass nil as the bytesPerSecond to disable the limit.
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/quartz_torrent/peerclient.rb', line 340 def setUploadRateLimit(infoHash, bytesPerSecond) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to set upload rate limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end if bytesPerSecond if ! torrentData.upRateLimit torrentData.upRateLimit = RateLimit.new(bytesPerSecond, 2*bytesPerSecond, 0) else torrentData.upRateLimit.unitsPerSecond = bytesPerSecond end else torrentData.upRateLimit = nil end torrentData.peers.all.each do |peer| withPeersIo(peer, "setting upload rate limit") do |io| io.writeRateLimit = torrentData.upRateLimit end end end |
#setUploadRatio(infoHash, ratio) ⇒ Object
Set the upload ratio. Pass nil to disable
365 366 367 368 369 370 371 372 373 |
# File 'lib/quartz_torrent/peerclient.rb', line 365 def setUploadRatio(infoHash, ratio) torrentData = @torrentData[infoHash] if ! torrentData @logger.warn "Asked to set upload ratio limit for a non-existent torrent #{QuartzTorrent.bytesToHex(infoHash)}" return end torrentData.ratio = ratio end |
#timerExpired(metadata) ⇒ Object
Reactor method called when a scheduled timer expires.
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 |
# File 'lib/quartz_torrent/peerclient.rb', line 668 def timerExpired() if .is_a?(Array) && [0] == :manage_peers managePeers([1]) elsif .is_a?(Array) && [0] == :request_blocks requestBlocks([1]) elsif .is_a?(Array) && [0] == :check_piece_manager checkPieceManagerResults([1]) elsif .is_a?(Array) && [0] == :handshake_timeout handleHandshakeTimeout([1]) elsif .is_a?(Array) && [0] == :removetorrent handleRemoveTorrent([1], [2]) elsif .is_a?(Array) && [0] == :pausetorrent handlePause([1], [2]) elsif .is_a?(Array) && [0] == :get_torrent_data @torrentData.each do |k,v| begin if [3].nil? || k == [3] v = TorrentDataDelegate.new(v, self) [1][k] = v end rescue @logger.error "Error building torrent data response for user: #{$!}" @logger.error "#{$!.backtrace.join("\n")}" end end [2].signal elsif .is_a?(Array) && [0] == :update_torrent_data delegate = [1] if ! @torrentData.has_key?(infoHash) delegate.state = :deleted else delegate.internalRefresh end [2].signal elsif .is_a?(Array) && [0] == :request_metadata_pieces requestMetadataPieces([1]) elsif .is_a?(Array) && [0] == :check_metadata_piece_manager checkMetadataPieceManagerResults([1]) elsif .is_a?(Array) && [0] == :runproc [1].call else @logger.info "Unknown timer #{} expired." end end |
#updateDelegateTorrentData(delegate) ⇒ Object
Update the data stored in a TorrentDataDelegate to the latest information.
437 438 439 440 441 442 443 444 |
# File 'lib/quartz_torrent/peerclient.rb', line 437 def updateDelegateTorrentData(delegate) return if stopped? # Use an immediate, non-recurring timer. semaphore = Semaphore.new @reactor.scheduleTimer(0, [:update_torrent_data, delegate, semaphore], false, true) semaphore.wait result end |