Class: QuartzTorrent::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/quartz_torrent/reactor.rb

Overview

This class implements the Reactor pattern. The Reactor listens for activity on IO objects and calls methods on an associated Handler object when activity is detected. Callers can use listen, connect or open to register IO objects with the reactor.

This Reactor is implemented using Fibers in such a way that when activity is defected on an IO, the handler can perform reads of N bytes without blocking and without needing to buffer. For example, the handler may call:

msg = io.read(300)

when it knows it must read 300 bytes. If only 100 are available, the handler is cooperatively preempted and later resumed when more bytes are available, so that the read seems atomic while also not blocking.

Defined Under Namespace

Classes: InternalTimerInfo

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handler, logger = nil) ⇒ Reactor

Create a new reactor that uses the passed hander.



393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/quartz_torrent/reactor.rb', line 393

def initialize(handler, logger = nil)
  raise "Reactor.new called with nil handler. Handler can't be nil" if handler.nil?

  @stopped = false
  @handler = handler
  @handler.reactor = self

  # Hash of IOInfo objects, keyed by io.
  @ioInfo = {}
  @timerManager = TimerManager.new(logger)
  @currentIoInfo = nil
  @logger = logger
  @listenBacklog = 10
  @eventRead, @eventWrite = IO.pipe
  @currentEventPipeChars = 0
  @currentHandlerCallback = nil
  @userEvents = []
end

Instance Attribute Details

#listenBacklogObject

Returns the value of attribute listenBacklog.



412
413
414
# File 'lib/quartz_torrent/reactor.rb', line 412

def listenBacklog
  @listenBacklog
end

Instance Method Details

#addUserEvent(event) ⇒ Object

Add a generic event. This event will be processed the next pass through the event loop



460
461
462
# File 'lib/quartz_torrent/reactor.rb', line 460

def addUserEvent(event)
  @userEvents.push event
end

#cancelTimer(timerInfo) ⇒ Object

Meant to be called from the handler. Cancel the timer scheduled with scheduleTimer



512
513
514
# File 'lib/quartz_torrent/reactor.rb', line 512

def cancelTimer(timerInfo)
  @timerManager.cancel timerInfo
end

#close(io = nil) ⇒ Object

Meant to be called from the handler. Closes the passed io, or if it’s nil, closes the current io



537
538
539
540
541
542
543
# File 'lib/quartz_torrent/reactor.rb', line 537

def close(io = nil)
  if ! io
    disposeIo @currentIoInfo if @currentIoInfo
  else
    disposeIo io
  end
end

#closeIo(io) ⇒ Object

Inner function in disposeIo.



781
782
783
784
785
786
787
788
# File 'lib/quartz_torrent/reactor.rb', line 781

def closeIo(io)
  begin
    io.close if !io.closed?
  rescue
    @logger.warn "Closing IO failed with exception #{$!}"
    @logger.debug $!.backtrace.join("\n")
  end
end

#connect(addr, port, metainfo, timeout = nil) ⇒ Object

Create a TCP connection to the specified host. Note that this method may raise exceptions. For example ‘Too many open files’ might be raised if the process is using too many file descriptors



422
423
424
425
426
427
428
429
# File 'lib/quartz_torrent/reactor.rb', line 422

def connect(addr, port, metainfo, timeout = nil)
  ioInfo = startConnection(port, addr, metainfo)
  @ioInfo[ioInfo.io] = ioInfo
  if timeout && ioInfo.state == :connecting
    ioInfo.connectTimeout = timeout
    ioInfo.connectTimer = scheduleTimer(timeout, InternalTimerInfo.new(:connect_timeout, ioInfo), false)
  end
end

#currentIoObject

Meant to be called from the handler. Returns the current io



546
547
548
# File 'lib/quartz_torrent/reactor.rb', line 546

def currentIo
  @currentIoInfo.readFiberIoFacade
end

#findIoByMetainfo(metainfo) ⇒ Object

Meant to be called from the handler. Find an IO by metainfo. The == operator is used to match the metainfo.



569
570
571
572
573
574
575
576
577
578
579
580
# File 'lib/quartz_torrent/reactor.rb', line 569

def findIoByMetainfo(metainfo)
  @ioInfo.each_value do |info|
    if info.metainfo == metainfo
      io = info.readFiberIoFacade
      # Don't allow read calls from timer handlers. This is to prevent a complex situation.
      # See the processTimer call in eventLoopBody for more info
      io = WriteOnlyIoFacade.new(info) if @currentHandlerCallback == :timer
      return io
    end
  end
  nil
end

#listen(addr, port, metainfo) ⇒ Object

Create a TCP server that listens for connections on the specified port



433
434
435
436
437
438
439
440
441
442
443
444
445
# File 'lib/quartz_torrent/reactor.rb', line 433

def listen(addr, port, metainfo)
  listener = Socket.new( AF_INET, SOCK_STREAM, 0 )
  sockaddr = Socket.pack_sockaddr_in( port, "0.0.0.0" )
  listener.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
  listener.bind( sockaddr )
  @logger.debug "listening on port #{port}" if @logger
  listener.listen( @listenBacklog )
             
  info = IOInfo.new(listener, metainfo)
  info.readFiberIoFacade.logger = @logger if @logger
  info.state = :listening
  @ioInfo[info.io] = info
end

#open(path, mode, metainfo, useErrorhandler = true) ⇒ Object

Open the specified file for the specified mode.



448
449
450
451
452
453
454
455
456
# File 'lib/quartz_torrent/reactor.rb', line 448

def open(path, mode, metainfo, useErrorhandler = true)
  file = File.open(path, mode)

  info = IOInfo.new(file, metainfo, true)
  info.useErrorhandler = useErrorhandler
  info.readFiberIoFacade.logger = @logger if @logger
  info.state = :connected
  @ioInfo[info.io] = info
end

#read(len) ⇒ Object

Meant to be called from the handler. Read ‘len’ bytes from the current IO.



527
528
529
530
531
532
533
534
# File 'lib/quartz_torrent/reactor.rb', line 527

def read(len)
  if @currentIoInfo
    # This is meant to be called from inside a fiber. Should add a check to confirm that here.
    @currentIoInfo.readFiberIoFacade.read(len)
  else
    raise "Reactor.read called with no current io. Was it called from a timer handler?"
  end
end

#scheduleTimer(duration, metainfo = nil, recurring = true, immed = false) ⇒ Object

Schedule a timer. Parameter ‘duration’ specifies the timer duration in seconds, ‘metainfo’ is caller information passed to the handler when the timer expires, ‘recurring’ should be true if the timer will repeat, or false if it will only expire once, and ‘immed’ when true specifies that the timer should expire immediately (and again each duration if recurring) while false specifies that the timer will only expire the first time after it’s duration elapses.



499
500
501
502
503
504
505
506
507
508
509
# File 'lib/quartz_torrent/reactor.rb', line 499

def scheduleTimer(duration, metainfo = nil, recurring = true, immed = false)
  timerInfo = @timerManager.add(duration, metainfo, recurring, immed)
  # This timer may expire sooner than the current sleep we are doing in select(). To make
  # sure we will write to the event pipe to break out of select().
  if @currentEventPipeChars == 0
    @eventWrite.write 'x'
    @currentEventPipeChars += 1
    @eventWrite.flush
  end
  timerInfo
end

#setMetaInfo(metainfo) ⇒ Object

Meant to be called from the handler. Sets the meta info for the current io



551
552
553
# File 'lib/quartz_torrent/reactor.rb', line 551

def setMetaInfo(metainfo)
  @currentIoInfo.metainfo = metainfo
end

#setReadRateLimit(rate) ⇒ Object

Meant to be called from the handler. Sets the max rate at which the current io can read.



556
557
558
559
# File 'lib/quartz_torrent/reactor.rb', line 556

def setReadRateLimit(rate)
  raise "The argument must be a RateLimit" if ! rate.nil? && !rate.is_a?(RateLimit)
  @currentIoInfo.readRateLimit = rate
end

#setWriteRateLimit(rate) ⇒ Object

Meant to be called from the handler. Sets the max rate at which the current io can be written to.



562
563
564
565
# File 'lib/quartz_torrent/reactor.rb', line 562

def setWriteRateLimit(rate)
  raise "The argument must be a RateLimit" if ! rate.nil? && !rate.is_a?(RateLimit)
  @currentIoInfo.writeRateLimit = rate
end

#startObject

Run event loop



465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/quartz_torrent/reactor.rb', line 465

def start
  while true
    begin
      break if eventLoopBody == :halt
    rescue
      @logger.error "Unexpected exception in reactor event loop: #{$!}" if @logger
      @logger.error $!.backtrace.join "\n" if @logger
    end
  end

  @logger.info "Reactor shutting down" if @logger

  # Event loop finished
  @ioInfo.each do |k,v|
    k.close
  end

end

#stopObject

Stop the event loop.



485
486
487
488
489
490
491
# File 'lib/quartz_torrent/reactor.rb', line 485

def stop
  @stopped = true
  return if @currentEventPipeChars > 0
  @eventWrite.write 'x'
  @currentEventPipeChars += 1
  @eventWrite.flush
end

#stopped?Boolean

Returns true if the reactor is stopped

Returns:

  • (Boolean)


415
416
417
# File 'lib/quartz_torrent/reactor.rb', line 415

def stopped?
  @stopped
end

#write(data) ⇒ Object

Meant to be called from the handler. Adds the specified data to the outgoing queue for the current io



517
518
519
520
521
522
523
524
# File 'lib/quartz_torrent/reactor.rb', line 517

def write(data)
  if @currentIoInfo
    # This is meant to be called from inside a fiber. Should add a check to confirm that here.
    @currentIoInfo.readFiberIoFacade.write(data)
  else
    raise "Reactor.write called with no current io. Was it called from a timer handler?"
  end
end