Class: EventCore::MainLoop
- Inherits:
-
Object
- Object
- EventCore::MainLoop
- Defined in:
- lib/event_core.rb
Overview
Core data structure for handling and polling Sources.
Instance Method Summary collapse
-
#add_fiber(&block) ⇒ Object
Schedule a block of code to be run inside a Ruby Fiber.
-
#add_idle(&block) ⇒ Object
Add an idle callback to the loop.
-
#add_once(delay_secs = nil, &block) ⇒ Object
Add an idle callback that is removed after its first invocation, no matter how it returns.
-
#add_quit(&block) ⇒ Object
Add a callback to invoke when the loop is quitting, before it becomes invalid.
-
#add_read(io, &block) ⇒ Object
Asynchronously read an IO calling the block each time data is ready.
-
#add_source(source) ⇒ Object
Add an event source to check in the loop.
-
#add_timeout(secs, &block) ⇒ Object
Add a timeout function to be called periodically, or until it returns with ‘next false’.
-
#add_unix_signal(*signals, &block) ⇒ Object
Add a unix signal handler that is dispatched in the main loop.
-
#add_write(io, buf, &block) ⇒ Object
Asynchronously write buf to io.
-
#initialize ⇒ MainLoop
constructor
A new instance of MainLoop.
-
#quit ⇒ Object
Safe and clean shutdown of the loop.
-
#run ⇒ Object
Start the loop, and do not return before some calls quit().
-
#running? ⇒ Boolean
Returns true iff a thread is currently iterating the loop with the run() method.
-
#send_wakeup ⇒ Object
Expert: wake up the main loop, forcing it to check all sources.
-
#spawn(*args, &block) ⇒ Object
Like Process.spawn(), invoking the given block in the main loop when the process child process exits.
-
#step ⇒ Object
Expert: Run a single iteration of the main loop.
-
#thread ⇒ Object
The Thread instance currently iterating the run() method.
-
#yield(&block) ⇒ Object
Must only be called from inside a fiber added with loop.add_fiber.
-
#yield_from_thread(&block) ⇒ Object
Must only be called from inside a fiber added with loop.add_fiber.
Constructor Details
#initialize ⇒ MainLoop
Returns a new instance of MainLoop.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/event_core.rb', line 341 def initialize # We use a monitor, not a mutex, becuase Ruby mutexes are not reentrant, # and we need reentrancy to be able to add sources from within trigger callbacks @monitor = Monitor.new @monitor.synchronize { @sources = [] @quit_handlers = [] # Only ever set @do_quit through the quit() method! # Otherwise the state of the loop will be undefiend @do_quit = false @control_source = PipeSource.new @control_source.trigger { |event| # We can get multiple control messages in one event, # so generally it is a "string of control chars", hence the include? and not == # If event is nil, it means the pipe has been closed @do_quit = true if event.nil? || event.include?('q') } @sources << @control_source @sigchld_source = nil @children = [] @thread = nil } end |
Instance Method Details
#add_fiber(&block) ⇒ Object
Schedule a block of code to be run inside a Ruby Fiber. If the block calls loop.yield without any argument the fiber will simply be resumed repeatedly in subsequent iterations of the loop, until it terminates. If loop.yield is called with a block it signals that the proc should be executed as an async task and the result of the task delivered as return value from loop.yield. The block supplied must take a single argument which is a FiberTask instance. When the task is complete you must call task.done to return to the yielded fiber. The (optional) argument you supply to task.done(result) will be passed back to the yielded fiber.
Example:
loop.add_fiber {
puts 'Waiting for slow result...'
slow_result = loop.yield { |task|
Thread.new { sleep 10; task.done('This took 10s') }
}
puts slow_result
}
Note: You can call this method from any thread. Since Ruby Fibers must be created from the same thread that runs them, EventCore will ensure the the fiber is created on the same thread as the main loop is running.
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 |
# File 'lib/event_core.rb', line 501 def add_fiber(&block) # Fibers must be created on the same thread that resumes them. # So if we're not on the main loop thread we get on it before # creating the fiber if Thread.current == @thread source = FiberSource.new(self, block) add_source(source) else source = FiberSource.new(self) add_once { source.create_fiber(block) add_source(source) } source end end |
#add_idle(&block) ⇒ Object
Add an idle callback to the loop. Will be removed like any other if it returns with ‘next false’. For one-off dispatches into the main loop, fx. for callbacks from another thread add_once() is even more convenient. Returns the source, so you can close!() it when no longer needed.
387 388 389 390 391 |
# File 'lib/event_core.rb', line 387 def add_idle(&block) source = IdleSource.new source.trigger { next false if block.call == false } add_source(source) end |
#add_once(delay_secs = nil, &block) ⇒ Object
Add an idle callback that is removed after its first invocation, no matter how it returns. Returns the source, for API consistency, but it is not really useful, as it will be auto-closed on next mainloop iteration.
397 398 399 400 401 |
# File 'lib/event_core.rb', line 397 def add_once(delay_secs=nil, &block) source = delay_secs.nil? ? IdleSource.new : TimeoutSource.new(delay_secs) source.trigger { block.call; next false } add_source(source) end |
#add_quit(&block) ⇒ Object
Add a callback to invoke when the loop is quitting, before it becomes invalid. Sources added during the callback will not be invoked, but will be cleaned up.
554 555 556 557 558 |
# File 'lib/event_core.rb', line 554 def add_quit(&block) @monitor.synchronize { @quit_handlers << block } end |
#add_read(io, &block) ⇒ Object
Asynchronously read an IO calling the block each time data is ready. The block receives to arguments: the read buffer, and an exception. The read buffer will be nil when EOF has been reached in which case the IO will be closed and the source removed from the loop. Returns the source so you can cancel the read with source.close!
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/event_core.rb', line 454 def add_read(io, &block) source = IOSource.new(io, :read) source.trigger { begin loop do buf = io.read_nonblock(4096*4) # 4 pages block.call(buf, nil) end rescue IO::WaitReadable # All good, wait until we're writable again next true rescue EOFError block.call(nil, nil) next false rescue => e block.call(nil, e) next false end } add_source(source) end |
#add_source(source) ⇒ Object
Add an event source to check in the loop. You can do this from any thread, or from trigger callbacks, or whenever you please. Returns the source, so you can close!() it when no longer needed.
372 373 374 375 376 377 378 379 380 |
# File 'lib/event_core.rb', line 372 def add_source(source) @monitor.synchronize { wakeup_needed = !@thread.nil? && @thread != Thread.current raise "Unable to add source - loop terminated" if @sources.nil? @sources << source send_wakeup if wakeup_needed } source end |
#add_timeout(secs, &block) ⇒ Object
Add a timeout function to be called periodically, or until it returns with ‘next false’. The timeout is in seconds and the first call is fired after it has elapsed. Returns the source, so you can close!() it when no longer needed.
406 407 408 409 410 |
# File 'lib/event_core.rb', line 406 def add_timeout(secs, &block) source = TimeoutSource.new(secs) source.trigger { next false if block.call == false } add_source(source) end |
#add_unix_signal(*signals, &block) ⇒ Object
Add a unix signal handler that is dispatched in the main loop. The handler will receive an array of signal numbers that was triggered since last step in the loop. You can provide one or more signals to listen for, given as integers or names. Returns the source, so you can close!() it when no longer needed.
417 418 419 420 421 |
# File 'lib/event_core.rb', line 417 def add_unix_signal(*signals, &block) source = UnixSignalSource.new(*signals) source.trigger { |signals| next false if block.call(signals) == false } add_source(source) end |
#add_write(io, buf, &block) ⇒ Object
Asynchronously write buf to io. Invokes block when complete, giving any encountered exception as argument, nil on success. Returns the source so you can close! it to cancel.
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 |
# File 'lib/event_core.rb', line 426 def add_write(io, buf, &block) source = IOSource.new(io, :write) source.trigger { begin # Note: because of string encoding snafu, Ruby can report more bytes read than buf.length! len = io.write_nonblock(buf) if len == buf.bytesize block.call(nil) unless block.nil? next false end buf = buf.byteslice(len..-1) next true rescue IO::WaitWritable # All good, wait until we're writable again next true rescue => e block.call(e) unless block.nil? next false end } add_source(source) end |
#quit ⇒ Object
Safe and clean shutdown of the loop. Note that the loop will only shut down on next iteration, not immediately.
597 598 599 600 601 |
# File 'lib/event_core.rb', line 597 def quit # Does not require locking. If any data comes through in what ever form, # we quit the loop send_control('q') end |
#run ⇒ Object
Start the loop, and do not return before some calls quit(). When the loop returns (via quit) it will call close! on all sources.
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 |
# File 'lib/event_core.rb', line 605 def run @thread = Thread.current loop do step break if @do_quit end @monitor.synchronize { @quit_handlers.each { |block| block.call } @children.each { |child| Process.detach(child[:pid]) } @children = nil @sources.each { |source| source.close! } @sources = nil @control_source.close! @thread = nil } end |
#running? ⇒ Boolean
Returns true iff a thread is currently iterating the loop with the run() method.
591 592 593 |
# File 'lib/event_core.rb', line 591 def running? !@thread.nil? end |
#send_wakeup ⇒ Object
Expert: wake up the main loop, forcing it to check all sources. Useful if you’re twiddling readyness of sources “out of band”.
698 699 700 |
# File 'lib/event_core.rb', line 698 def send_wakeup send_control('.') end |
#spawn(*args, &block) ⇒ Object
Like Process.spawn(), invoking the given block in the main loop when the process child process exits. The block is called with the Process::Status object of the child.
WARNING: The main loop install a SIGCHLD handler to automatically wait() on processes started this way. So this function will not work correctly if you tamper with SIGCHLD yourself.
When you quit the loop any non-waited for children will be detached with Process.detach() to prevent zombies.
Returns the PID of the child (that you should /not/ wait() on).
572 573 574 575 576 577 578 579 580 581 582 |
# File 'lib/event_core.rb', line 572 def spawn(*args, &block) if @sigchld_source.nil? @sigchld_source = add_unix_signal("CHLD") { reap_children } end pid = Process.spawn(*args) @children << {:pid => pid, :block => block} pid end |
#step ⇒ Object
Expert: Run a single iteration of the main loop.
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 |
# File 'lib/event_core.rb', line 629 def step # Collect sources ready_sources = [] select_sources_by_ios = {} read_ios = [] write_ios = [] timeout = nil @monitor.synchronize { @sources.delete_if do |source| if source.closed? true else source_ready = source.ready? ready_sources << source if source_ready io = source.select_io unless io.nil? || io.closed? case source.select_type when :read read_ios << io when :write write_ios << io else raise "Invalid source select_type: #{source.select_type}" end select_sources_by_ios[io] = source end dt = source_ready ? 0 : source.timeout timeout = timeout.nil? ? dt : (dt.nil? ? timeout : (timeout < dt ? timeout : dt)) false end end } # Release lock while we're selecting so users can add sources. add_source() will see # that we are stuck in a select() and do send_wakeup(). # Note: Only select() without locking, everything else must be locked! read_ios, write_ios, exception_ios = IO.select(read_ios, write_ios, [], timeout) @monitor.synchronize { # On timeout read_ios will be nil unless read_ios.nil? read_ios.each { |io| ready_sources << select_sources_by_ios[io] } end unless write_ios.nil? write_ios.each { |io| ready_sources << select_sources_by_ios[io] } end } # Dispatch all sources marked ready ready_sources.each { |source| source.notify_trigger } @do_quit = true if @control_source.closed? end |
#thread ⇒ Object
The Thread instance currently iterating the run() method. nil if the loop is not running
586 587 588 |
# File 'lib/event_core.rb', line 586 def thread @thread end |
#yield(&block) ⇒ Object
Must only be called from inside a fiber added with loop.add_fiber. Without arguments simply passes control back to the mainloop and resumes execution in next mainloop iteration. If passed a block, the block must take exactly one argument, which is a FiberTask. The block will be executed and the fiber scheduled for resumption when task.done is called. If an argument is passed to task.done then this will become the return value of the yield.
525 526 527 528 |
# File 'lib/event_core.rb', line 525 def yield(&block) raise "Blocks passed to loop.yield must have arity 1" unless block.nil? or block.arity == 1 Fiber.yield block end |
#yield_from_thread(&block) ⇒ Object
Must only be called from inside a fiber added with loop.add_fiber. Convenience function on top of loop.yield, returning the result of a block run in a new thread. Unlike loop.yield the block must not take any arguments; it is simply the raw result from the block that is send back to the yielding fiber.
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 |
# File 'lib/event_core.rb', line 534 def yield_from_thread(&block) raise 'A block must be provided' if block.nil? raise "Block must take exactly 0 arguments: #{block.arity}" unless block.arity == 0 self.yield do |task| thread = Thread.new { begin result = block.call ensure add_once { task.done(result) thread.join } end } end end |