Class: QuartzTorrent::Reactor
- Inherits:
-
Object
- Object
- QuartzTorrent::Reactor
- Defined in:
- lib/quartz_torrent/reactor.rb
Overview
This class implements the Reactor pattern. The Reactor listens for activity on IO objects and calls methods on an associated Handler object when activity is detected. Callers can use listen, connect or open to register IO objects with the reactor.
This Reactor is implemented using Fibers in such a way that when activity is defected on an IO, the handler can perform reads of N bytes without blocking and without needing to buffer. For example, the handler may call:
msg = io.read(300)
when it knows it must read 300 bytes. If only 100 are available, the handler is cooperatively preempted and later resumed when more bytes are available, so that the read seems atomic while also not blocking.
Defined Under Namespace
Classes: InternalTimerInfo
Instance Attribute Summary collapse
-
#listenBacklog ⇒ Object
Returns the value of attribute listenBacklog.
Instance Method Summary collapse
-
#addUserEvent(event) ⇒ Object
Add a generic event.
-
#cancelTimer(timerInfo) ⇒ Object
Meant to be called from the handler.
-
#close(io = nil) ⇒ Object
Meant to be called from the handler.
-
#closeIo(io) ⇒ Object
Inner function in disposeIo.
-
#connect(addr, port, metainfo, timeout = nil) ⇒ Object
Create a TCP connection to the specified host.
-
#currentIo ⇒ Object
Meant to be called from the handler.
-
#findIoByMetainfo(metainfo) ⇒ Object
Meant to be called from the handler.
-
#initialize(handler, logger = nil) ⇒ Reactor
constructor
Create a new reactor that uses the passed hander.
-
#listen(addr, port, metainfo) ⇒ Object
Create a TCP server that listens for connections on the specified port.
-
#open(path, mode, metainfo, useErrorhandler = true) ⇒ Object
Open the specified file for the specified mode.
-
#read(len) ⇒ Object
Meant to be called from the handler.
-
#scheduleTimer(duration, metainfo = nil, recurring = true, immed = false) ⇒ Object
Schedule a timer.
-
#setMetaInfo(metainfo) ⇒ Object
Meant to be called from the handler.
-
#setReadRateLimit(rate) ⇒ Object
Meant to be called from the handler.
-
#setWriteRateLimit(rate) ⇒ Object
Meant to be called from the handler.
-
#start ⇒ Object
Run event loop.
-
#stop ⇒ Object
Stop the event loop.
-
#stopped? ⇒ Boolean
Returns true if the reactor is stopped.
-
#write(data) ⇒ Object
Meant to be called from the handler.
Constructor Details
#initialize(handler, logger = nil) ⇒ Reactor
Create a new reactor that uses the passed hander.
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/quartz_torrent/reactor.rb', line 393 def initialize(handler, logger = nil) raise "Reactor.new called with nil handler. Handler can't be nil" if handler.nil? @stopped = false @handler = handler @handler.reactor = self # Hash of IOInfo objects, keyed by io. @ioInfo = {} @timerManager = TimerManager.new(logger) @currentIoInfo = nil @logger = logger @listenBacklog = 10 @eventRead, @eventWrite = IO.pipe @currentEventPipeChars = 0 @currentHandlerCallback = nil @userEvents = [] end |
Instance Attribute Details
#listenBacklog ⇒ Object
Returns the value of attribute listenBacklog.
412 413 414 |
# File 'lib/quartz_torrent/reactor.rb', line 412 def listenBacklog @listenBacklog end |
Instance Method Details
#addUserEvent(event) ⇒ Object
Add a generic event. This event will be processed the next pass through the event loop
460 461 462 |
# File 'lib/quartz_torrent/reactor.rb', line 460 def addUserEvent(event) @userEvents.push event end |
#cancelTimer(timerInfo) ⇒ Object
Meant to be called from the handler. Cancel the timer scheduled with scheduleTimer
512 513 514 |
# File 'lib/quartz_torrent/reactor.rb', line 512 def cancelTimer(timerInfo) @timerManager.cancel timerInfo end |
#close(io = nil) ⇒ Object
Meant to be called from the handler. Closes the passed io, or if it’s nil, closes the current io
537 538 539 540 541 542 543 |
# File 'lib/quartz_torrent/reactor.rb', line 537 def close(io = nil) if ! io disposeIo @currentIoInfo if @currentIoInfo else disposeIo io end end |
#closeIo(io) ⇒ Object
Inner function in disposeIo.
781 782 783 784 785 786 787 788 |
# File 'lib/quartz_torrent/reactor.rb', line 781 def closeIo(io) begin io.close if !io.closed? rescue @logger.warn "Closing IO failed with exception #{$!}" @logger.debug $!.backtrace.join("\n") end end |
#connect(addr, port, metainfo, timeout = nil) ⇒ Object
Create a TCP connection to the specified host. Note that this method may raise exceptions. For example ‘Too many open files’ might be raised if the process is using too many file descriptors
422 423 424 425 426 427 428 429 |
# File 'lib/quartz_torrent/reactor.rb', line 422 def connect(addr, port, , timeout = nil) ioInfo = startConnection(port, addr, ) @ioInfo[ioInfo.io] = ioInfo if timeout && ioInfo.state == :connecting ioInfo.connectTimeout = timeout ioInfo.connectTimer = scheduleTimer(timeout, InternalTimerInfo.new(:connect_timeout, ioInfo), false) end end |
#currentIo ⇒ Object
Meant to be called from the handler. Returns the current io
546 547 548 |
# File 'lib/quartz_torrent/reactor.rb', line 546 def currentIo @currentIoInfo.readFiberIoFacade end |
#findIoByMetainfo(metainfo) ⇒ Object
Meant to be called from the handler. Find an IO by metainfo. The == operator is used to match the metainfo.
569 570 571 572 573 574 575 576 577 578 579 580 |
# File 'lib/quartz_torrent/reactor.rb', line 569 def findIoByMetainfo() @ioInfo.each_value do |info| if info. == io = info.readFiberIoFacade # Don't allow read calls from timer handlers. This is to prevent a complex situation. # See the processTimer call in eventLoopBody for more info io = WriteOnlyIoFacade.new(info) if @currentHandlerCallback == :timer return io end end nil end |
#listen(addr, port, metainfo) ⇒ Object
Create a TCP server that listens for connections on the specified port
433 434 435 436 437 438 439 440 441 442 443 444 445 |
# File 'lib/quartz_torrent/reactor.rb', line 433 def listen(addr, port, ) listener = Socket.new( AF_INET, SOCK_STREAM, 0 ) sockaddr = Socket.pack_sockaddr_in( port, "0.0.0.0" ) listener.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true) listener.bind( sockaddr ) @logger.debug "listening on port #{port}" if @logger listener.listen( @listenBacklog ) info = IOInfo.new(listener, ) info.readFiberIoFacade.logger = @logger if @logger info.state = :listening @ioInfo[info.io] = info end |
#open(path, mode, metainfo, useErrorhandler = true) ⇒ Object
Open the specified file for the specified mode.
448 449 450 451 452 453 454 455 456 |
# File 'lib/quartz_torrent/reactor.rb', line 448 def open(path, mode, , useErrorhandler = true) file = File.open(path, mode) info = IOInfo.new(file, , true) info.useErrorhandler = useErrorhandler info.readFiberIoFacade.logger = @logger if @logger info.state = :connected @ioInfo[info.io] = info end |
#read(len) ⇒ Object
Meant to be called from the handler. Read ‘len’ bytes from the current IO.
527 528 529 530 531 532 533 534 |
# File 'lib/quartz_torrent/reactor.rb', line 527 def read(len) if @currentIoInfo # This is meant to be called from inside a fiber. Should add a check to confirm that here. @currentIoInfo.readFiberIoFacade.read(len) else raise "Reactor.read called with no current io. Was it called from a timer handler?" end end |
#scheduleTimer(duration, metainfo = nil, recurring = true, immed = false) ⇒ Object
Schedule a timer. Parameter ‘duration’ specifies the timer duration in seconds, ‘metainfo’ is caller information passed to the handler when the timer expires, ‘recurring’ should be true if the timer will repeat, or false if it will only expire once, and ‘immed’ when true specifies that the timer should expire immediately (and again each duration if recurring) while false specifies that the timer will only expire the first time after it’s duration elapses.
499 500 501 502 503 504 505 506 507 508 509 |
# File 'lib/quartz_torrent/reactor.rb', line 499 def scheduleTimer(duration, = nil, recurring = true, immed = false) timerInfo = @timerManager.add(duration, , recurring, immed) # This timer may expire sooner than the current sleep we are doing in select(). To make # sure we will write to the event pipe to break out of select(). if @currentEventPipeChars == 0 @eventWrite.write 'x' @currentEventPipeChars += 1 @eventWrite.flush end timerInfo end |
#setMetaInfo(metainfo) ⇒ Object
Meant to be called from the handler. Sets the meta info for the current io
551 552 553 |
# File 'lib/quartz_torrent/reactor.rb', line 551 def setMetaInfo() @currentIoInfo. = end |
#setReadRateLimit(rate) ⇒ Object
Meant to be called from the handler. Sets the max rate at which the current io can read.
556 557 558 559 |
# File 'lib/quartz_torrent/reactor.rb', line 556 def setReadRateLimit(rate) raise "The argument must be a RateLimit" if ! rate.nil? && !rate.is_a?(RateLimit) @currentIoInfo.readRateLimit = rate end |
#setWriteRateLimit(rate) ⇒ Object
Meant to be called from the handler. Sets the max rate at which the current io can be written to.
562 563 564 565 |
# File 'lib/quartz_torrent/reactor.rb', line 562 def setWriteRateLimit(rate) raise "The argument must be a RateLimit" if ! rate.nil? && !rate.is_a?(RateLimit) @currentIoInfo.writeRateLimit = rate end |
#start ⇒ Object
Run event loop
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 |
# File 'lib/quartz_torrent/reactor.rb', line 465 def start while true begin break if eventLoopBody == :halt rescue @logger.error "Unexpected exception in reactor event loop: #{$!}" if @logger @logger.error $!.backtrace.join "\n" if @logger end end @logger.info "Reactor shutting down" if @logger # Event loop finished @ioInfo.each do |k,v| k.close end end |
#stop ⇒ Object
Stop the event loop.
485 486 487 488 489 490 491 |
# File 'lib/quartz_torrent/reactor.rb', line 485 def stop @stopped = true return if @currentEventPipeChars > 0 @eventWrite.write 'x' @currentEventPipeChars += 1 @eventWrite.flush end |
#stopped? ⇒ Boolean
Returns true if the reactor is stopped
415 416 417 |
# File 'lib/quartz_torrent/reactor.rb', line 415 def stopped? @stopped end |
#write(data) ⇒ Object
Meant to be called from the handler. Adds the specified data to the outgoing queue for the current io
517 518 519 520 521 522 523 524 |
# File 'lib/quartz_torrent/reactor.rb', line 517 def write(data) if @currentIoInfo # This is meant to be called from inside a fiber. Should add a check to confirm that here. @currentIoInfo.readFiberIoFacade.write(data) else raise "Reactor.write called with no current io. Was it called from a timer handler?" end end |