Class: Goru::Scheduler
- Inherits:
-
Object
- Object
- Goru::Scheduler
- Includes:
- Core::Global
- Defined in:
- lib/goru/scheduler.rb
Overview
- public
Class Method Summary collapse
-
.default_reactor_count ⇒ Object
[public].
-
.go ⇒ Object
Prevent issues when including ‘Goru` at the toplevel.
Instance Method Summary collapse
-
#go(state = nil, io: nil, channel: nil, intent: nil, &block) ⇒ Object
[public].
-
#initialize(count: self.class.default_reactor_count) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#signal ⇒ Object
[public].
-
#stop ⇒ Object
[public].
-
#wait ⇒ Object
[public].
- #wakeup ⇒ Object
Constructor Details
#initialize(count: self.class.default_reactor_count) ⇒ Scheduler
Returns a new instance of Scheduler.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/goru/scheduler.rb', line 37 def initialize(count: self.class.default_reactor_count) super() @waiting = false @stopped = false @routines = Thread::Queue.new @selector = NIO::Selector.new @reactors = count.times.map { Reactor.new(queue: @routines, scheduler: self) } @threads = @reactors.map { |reactor| Thread.new { Thread.handle_interrupt(Interrupt => :never) do reactor.run rescue IOError end } } end |
Class Method Details
.default_reactor_count ⇒ Object
- public
32 33 34 |
# File 'lib/goru/scheduler.rb', line 32 def default_reactor_count Etc.nprocessors end |
.go ⇒ Object
Prevent issues when including ‘Goru` at the toplevel.
- public
26 27 28 |
# File 'lib/goru/scheduler.rb', line 26 def go(...) global.go(...) end |
Instance Method Details
#go(state = nil, io: nil, channel: nil, intent: nil, &block) ⇒ Object
- public
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/goru/scheduler.rb', line 61 def go(state = nil, io: nil, channel: nil, intent: nil, &block) raise ArgumentError, "cannot set both `io` and `channel`" if io && channel routine = if io Routines::IO.new(state, io: io, intent: intent, event_loop: @io_event_loop, &block) elsif channel case intent when :r Routines::Channels::Readable.new(state, channel: channel, &block) when :w Routines::Channels::Writable.new(state, channel: channel, &block) end else Routine.new(state, &block) end @routines << routine @reactors.each(&:wakeup) routine end |
#signal ⇒ Object
- public
107 108 109 110 111 |
# File 'lib/goru/scheduler.rb', line 107 def signal return unless @waiting && @reactors.all?(&:finished?) @waiting = false wakeup end |
#stop ⇒ Object
- public
97 98 99 100 101 102 103 |
# File 'lib/goru/scheduler.rb', line 97 def stop @stopped = true @routines.close @selector.close @reactors.each(&:stop) @threads.each(&:join) end |
#wait ⇒ Object
- public
85 86 87 88 89 90 91 92 93 |
# File 'lib/goru/scheduler.rb', line 85 def wait @waiting = true @reactors.each(&:wakeup) @selector.select while @waiting rescue IOError, Interrupt # nothing to do ensure stop end |
#wakeup ⇒ Object
113 114 115 116 117 |
# File 'lib/goru/scheduler.rb', line 113 def wakeup @selector.wakeup rescue IOError # nothing to do end |