Class: Goru::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/goru/reactor.rb

Overview

public

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue:, scheduler:) ⇒ Reactor

Returns a new instance of Reactor.



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/goru/reactor.rb', line 15

def initialize(queue:, scheduler:)
  @queue = queue
  @scheduler = scheduler
  @routines = Set.new
  @bridges = Set.new
  @timers = Timers::Group.new
  @selector = NIO::Selector.new
  @stopped = false
  @status = nil
  @mutex = Mutex.new
end

Instance Attribute Details

#statusObject (readonly)

public


29
30
31
# File 'lib/goru/reactor.rb', line 29

def status
  @status
end

Instance Method Details

#adopt_routine(routine) ⇒ Object

public


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/goru/reactor.rb', line 157

def adopt_routine(routine)
  case routine
  when Routines::IO
    monitor = @selector.register(routine.io, routine.intent)
    monitor.value = routine
    routine.monitor = monitor
    routine.reactor = self
  when Routines::Bridge
    routine.reactor = self
    @bridges << routine
  when Routine
    routine.reactor = self
    @routines << routine
  end
end

#finished?Boolean

public

Returns:

  • (Boolean)


118
119
120
121
122
# File 'lib/goru/reactor.rb', line 118

def finished?
  @mutex.synchronize do
    @status == :idle || @status == :stopped
  end
end

#routine_asleep(routine, seconds) ⇒ Object

public


149
150
151
152
153
# File 'lib/goru/reactor.rb', line 149

def routine_asleep(routine, seconds)
  @timers.after(seconds) {
    routine.wake
  }
end

#routine_finished(routine) ⇒ Object

public


175
176
177
178
179
180
181
182
183
184
# File 'lib/goru/reactor.rb', line 175

def routine_finished(routine)
  case routine
  when Routines::Bridge
    @bridges.delete(routine)
  when Routines::IO
    @selector.deregister(routine.io)
  else
    @routines.delete(routine)
  end
end

#runObject

public


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/goru/reactor.rb', line 33

def run
  until @stopped
    set_status(:running)

    @routines.each do |routine|
      call_routine(routine)
    end

    begin
      wait_for_routine(block: false)
    rescue ThreadError
      interval = @timers.wait_interval

      if interval.nil?
        if @routines.empty?
          if @selector.empty?
            become_idle
          else
            wait_for_bridge do
              wait_for_selector
            end
          end
        else
          wait_for_bridge do
            wait_for_selector(0)
          end
        end
      elsif interval > 0
        if @selector.empty?
          wait_for_interval(interval)
        else
          wait_for_bridge do
            wait_for_selector(interval)
          end
        end
      end

      @timers.fire
    end
  end
ensure
  @selector.close
  set_status(:finished)
end

#signalObject

public


126
127
128
129
130
# File 'lib/goru/reactor.rb', line 126

def signal
  unless @selector.empty?
    @selector.wakeup
  end
end

#stopObject

public


141
142
143
144
145
# File 'lib/goru/reactor.rb', line 141

def stop
  @stopped = true
  @selector.wakeup
rescue IOError
end

#wakeupObject

public


134
135
136
137
# File 'lib/goru/reactor.rb', line 134

def wakeup
  signal
  @queue << :wakeup
end