Class: EventCore::MainLoop

Inherits:
Object
  • Object
show all
Defined in:
lib/event_core.rb

Overview

Core data structure for handling and polling Sources.

Instance Method Summary collapse

Constructor Details

#initializeMainLoop

Returns a new instance of MainLoop.



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/event_core.rb', line 341

def initialize
  # We use a monitor, not a mutex, becuase Ruby mutexes are not reentrant,
  # and we need reentrancy to be able to add sources from within trigger callbacks
  @monitor = Monitor.new

  @monitor.synchronize {
    @sources = []
    @quit_handlers = []

    # Only ever set @do_quit through the quit() method!
    # Otherwise the state of the loop will be undefiend
    @do_quit = false
    @control_source = PipeSource.new
    @control_source.trigger { |event|
      # We can get multiple control messages in one event,
      # so generally it is a "string of control chars", hence the include? and not ==
      # If event is nil, it means the pipe has been closed
      @do_quit = true if event.nil? || event.include?('q')
    }
    @sources << @control_source

    @sigchld_source = nil
    @children = []

    @thread = nil
  }
end

Instance Method Details

#add_fiber(&block) ⇒ Object

Schedule a block of code to be run inside a Ruby Fiber. If the block calls loop.yield without any argument the fiber will simply be resumed repeatedly in subsequent iterations of the loop, until it terminates. If loop.yield is called with a block it signals that the proc should be executed as an async task and the result of the task delivered as return value from loop.yield. The block supplied must take a single argument which is a FiberTask instance. When the task is complete you must call task.done to return to the yielded fiber. The (optional) argument you supply to task.done(result) will be passed back to the yielded fiber.

Example:

loop.add_fiber {
  puts 'Waiting for slow result...'
  slow_result = loop.yield { |task|
                  Thread.new { sleep 10; task.done('This took 10s') }
                }
  puts slow_result
}

Note: You can call this method from any thread. Since Ruby Fibers must be created from the same thread that runs them, EventCore will ensure the the fiber is created on the same thread as the main loop is running.



501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
# File 'lib/event_core.rb', line 501

def add_fiber(&block)
  # Fibers must be created on the same thread that resumes them.
  # So if we're not on the main loop thread we get on it before
  # creating the fiber
  if Thread.current == @thread
    source = FiberSource.new(self, block)
    add_source(source)
  else
    source = FiberSource.new(self)
    add_once {
      source.create_fiber(block)
      add_source(source)
    }
    source
  end
end

#add_idle(&block) ⇒ Object

Add an idle callback to the loop. Will be removed like any other if it returns with ‘next false’. For one-off dispatches into the main loop, fx. for callbacks from another thread add_once() is even more convenient. Returns the source, so you can close!() it when no longer needed.



387
388
389
390
391
# File 'lib/event_core.rb', line 387

def add_idle(&block)
  source = IdleSource.new
  source.trigger { next false if block.call == false }
  add_source(source)
end

#add_once(delay_secs = nil, &block) ⇒ Object

Add an idle callback that is removed after its first invocation, no matter how it returns. Returns the source, for API consistency, but it is not really useful, as it will be auto-closed on next mainloop iteration.



397
398
399
400
401
# File 'lib/event_core.rb', line 397

def add_once(delay_secs=nil, &block)
  source = delay_secs.nil? ? IdleSource.new : TimeoutSource.new(delay_secs)
  source.trigger { block.call; next false  }
  add_source(source)
end

#add_quit(&block) ⇒ Object

Add a callback to invoke when the loop is quitting, before it becomes invalid. Sources added during the callback will not be invoked, but will be cleaned up.



554
555
556
557
558
# File 'lib/event_core.rb', line 554

def add_quit(&block)
  @monitor.synchronize {
    @quit_handlers << block
  }
end

#add_read(io, &block) ⇒ Object

Asynchronously read an IO calling the block each time data is ready. The block receives to arguments: the read buffer, and an exception. The read buffer will be nil when EOF has been reached in which case the IO will be closed and the source removed from the loop. Returns the source so you can cancel the read with source.close!



454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/event_core.rb', line 454

def add_read(io, &block)
  source = IOSource.new(io, :read)
  source.trigger {
    begin
      loop do
        buf = io.read_nonblock(4096*4) # 4 pages
        block.call(buf, nil)
      end
    rescue IO::WaitReadable
      # All good, wait until we're writable again
      next true
    rescue EOFError
      block.call(nil, nil)
      next false
    rescue => e
      block.call(nil, e)
      next false
    end
  }
  add_source(source)
end

#add_source(source) ⇒ Object

Add an event source to check in the loop. You can do this from any thread, or from trigger callbacks, or whenever you please. Returns the source, so you can close!() it when no longer needed.



372
373
374
375
376
377
378
379
380
# File 'lib/event_core.rb', line 372

def add_source(source)
  @monitor.synchronize {
    wakeup_needed = !@thread.nil? && @thread != Thread.current
    raise "Unable to add source - loop terminated" if @sources.nil?
    @sources << source
    send_wakeup if wakeup_needed
  }
  source
end

#add_timeout(secs, &block) ⇒ Object

Add a timeout function to be called periodically, or until it returns with ‘next false’. The timeout is in seconds and the first call is fired after it has elapsed. Returns the source, so you can close!() it when no longer needed.



406
407
408
409
410
# File 'lib/event_core.rb', line 406

def add_timeout(secs, &block)
  source = TimeoutSource.new(secs)
  source.trigger { next false if block.call == false }
  add_source(source)
end

#add_unix_signal(*signals, &block) ⇒ Object

Add a unix signal handler that is dispatched in the main loop. The handler will receive an array of signal numbers that was triggered since last step in the loop. You can provide one or more signals to listen for, given as integers or names. Returns the source, so you can close!() it when no longer needed.



417
418
419
420
421
# File 'lib/event_core.rb', line 417

def add_unix_signal(*signals, &block)
  source = UnixSignalSource.new(*signals)
  source.trigger { |signals|  next false if block.call(signals) == false }
  add_source(source)
end

#add_write(io, buf, &block) ⇒ Object

Asynchronously write buf to io. Invokes block when complete, giving any encountered exception as argument, nil on success. Returns the source so you can close! it to cancel.



426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/event_core.rb', line 426

def add_write(io, buf, &block)
  source = IOSource.new(io, :write)
  source.trigger {
    begin
      # Note: because of string encoding snafu, Ruby can report more bytes read than buf.length!
      len = io.write_nonblock(buf)
      if len == buf.bytesize
        block.call(nil) unless block.nil?
        next false
      end
      buf = buf.byteslice(len..-1)
      next true
    rescue IO::WaitWritable
      # All good, wait until we're writable again
      next true
    rescue => e
      block.call(e) unless block.nil?
      next false
    end
  }
  add_source(source)
end

#quitObject

Safe and clean shutdown of the loop. Note that the loop will only shut down on next iteration, not immediately.



597
598
599
600
601
# File 'lib/event_core.rb', line 597

def quit
  # Does not require locking. If any data comes through in what ever form,
  # we quit the loop
  send_control('q')
end

#runObject

Start the loop, and do not return before some calls quit(). When the loop returns (via quit) it will call close! on all sources.



605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
# File 'lib/event_core.rb', line 605

def run
  @thread = Thread.current

  loop do
    step
    break if @do_quit
  end

  @monitor.synchronize {
    @quit_handlers.each { |block| block.call }

    @children.each { |child| Process.detach(child[:pid]) }
    @children = nil

    @sources.each { |source| source.close! }
    @sources = nil

    @control_source.close!

    @thread = nil
  }
end

#running?Boolean

Returns true iff a thread is currently iterating the loop with the run() method.

Returns:



591
592
593
# File 'lib/event_core.rb', line 591

def running?
  !@thread.nil?
end

#send_wakeupObject

Expert: wake up the main loop, forcing it to check all sources. Useful if you’re twiddling readyness of sources “out of band”.



698
699
700
# File 'lib/event_core.rb', line 698

def send_wakeup
  send_control('.')
end

#spawn(*args, &block) ⇒ Object

Like Process.spawn(), invoking the given block in the main loop when the process child process exits. The block is called with the Process::Status object of the child.

WARNING: The main loop install a SIGCHLD handler to automatically wait() on processes started this way. So this function will not work correctly if you tamper with SIGCHLD yourself.

When you quit the loop any non-waited for children will be detached with Process.detach() to prevent zombies.

Returns the PID of the child (that you should /not/ wait() on).



572
573
574
575
576
577
578
579
580
581
582
# File 'lib/event_core.rb', line 572

def spawn(*args, &block)
  if @sigchld_source.nil?
    @sigchld_source = add_unix_signal("CHLD") {
      reap_children
    }
  end

  pid = Process.spawn(*args)
  @children << {:pid => pid, :block => block}
  pid
end

#stepObject

Expert: Run a single iteration of the main loop.



629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
# File 'lib/event_core.rb', line 629

def step
  # Collect sources
  ready_sources = []
  select_sources_by_ios = {}
  read_ios = []
  write_ios = []
  timeout = nil

  @monitor.synchronize {
    @sources.delete_if do |source|
      if source.closed?
        true
      else
        source_ready = source.ready?
        ready_sources << source if source_ready

        io = source.select_io
        unless io.nil? || io.closed?
          case source.select_type
            when :read
              read_ios << io
            when :write
              write_ios << io
            else
              raise "Invalid source select_type: #{source.select_type}"
          end

          select_sources_by_ios[io] = source
        end

        dt = source_ready ? 0 : source.timeout
        timeout = timeout.nil? ?
            dt : (dt.nil? ? timeout : (timeout < dt ? timeout : dt))

        false
      end
    end
  }

  # Release lock while we're selecting so users can add sources. add_source() will see
  # that we are stuck in a select() and do send_wakeup().
  # Note: Only select() without locking, everything else must be locked!
  read_ios, write_ios, exception_ios = IO.select(read_ios, write_ios, [], timeout)

  @monitor.synchronize {
    # On timeout read_ios will be nil
    unless read_ios.nil?
      read_ios.each { |io|
        ready_sources << select_sources_by_ios[io]
      }
    end

    unless write_ios.nil?
      write_ios.each { |io|
        ready_sources << select_sources_by_ios[io]
      }
    end
  }

  # Dispatch all sources marked ready
  ready_sources.each { |source|
    source.notify_trigger
  }

  @do_quit = true if @control_source.closed?
end

#threadObject

The Thread instance currently iterating the run() method. nil if the loop is not running



586
587
588
# File 'lib/event_core.rb', line 586

def thread
  @thread
end

#yield(&block) ⇒ Object

Must only be called from inside a fiber added with loop.add_fiber. Without arguments simply passes control back to the mainloop and resumes execution in next mainloop iteration. If passed a block, the block must take exactly one argument, which is a FiberTask. The block will be executed and the fiber scheduled for resumption when task.done is called. If an argument is passed to task.done then this will become the return value of the yield.



525
526
527
528
# File 'lib/event_core.rb', line 525

def yield(&block)
  raise "Blocks passed to loop.yield must have arity 1" unless block.nil? or block.arity == 1
  Fiber.yield block
end

#yield_from_thread(&block) ⇒ Object

Must only be called from inside a fiber added with loop.add_fiber. Convenience function on top of loop.yield, returning the result of a block run in a new thread. Unlike loop.yield the block must not take any arguments; it is simply the raw result from the block that is send back to the yielding fiber.



534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
# File 'lib/event_core.rb', line 534

def yield_from_thread(&block)
  raise 'A block must be provided' if block.nil?
  raise "Block must take exactly 0 arguments: #{block.arity}" unless block.arity == 0

  self.yield do |task|
    thread = Thread.new {
      begin
        result = block.call
      ensure
        add_once {
          task.done(result)
          thread.join
        }
      end
    }
  end
end