Class: Goru::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/goru/reactor.rb

Overview

public

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue:, scheduler:) ⇒ Reactor

Returns a new instance of Reactor.



14
15
16
17
18
19
20
21
22
23
# File 'lib/goru/reactor.rb', line 14

def initialize(queue:, scheduler:)
  @queue = queue
  @scheduler = scheduler
  @routines = []
  @finished = []
  @timers = Timers::Group.new
  @selector = NIO::Selector.new
  @status = nil
  @stopped = false
end

Instance Attribute Details

#statusObject (readonly)

public


27
28
29
# File 'lib/goru/reactor.rb', line 27

def status
  @status
end

Instance Method Details

#runObject

public


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/goru/reactor.rb', line 31

def run
  @status = :running

  until @stopped
    @routines.each do |routine|
      call_routine(routine)
    end

    # TODO: Remove the monitor from the selector.
    #
    cleanup_finished_routines

    begin
      if (routine = @queue.pop(true))
        adopt_routine(routine)
      end
    rescue ThreadError
      interval = @timers.wait_interval

      if interval.nil?
        if @routines.empty?
          if @selector.empty?
            @status = :looking
            @scheduler.signal(self)
            if (routine = @queue.pop)
              adopt_routine(routine)
            end
          else
            # TODO: The issue doing this is that this reactor won't grab new routines. Will calling `@selector.wakeup`
            # from the scheduler when a routine is added to the queue resolve this?
            #
            @selector.select do |monitor|
              monitor.value.call
            end
          end
        else
          @selector.select(0) do |monitor|
            monitor.value.call
          end
        end
      elsif interval > 0
        if @selector.empty?
          Timers::Wait.for(interval) do |remaining|
            if (routine = @queue.pop_with_timeout(remaining))
              adopt_routine(routine)
              break
            end
          rescue ThreadError
            # nothing to do
          end
        else
          @selector.select(interval) do |monitor|
            monitor.value.call
          end
        end
      end

      @timers.fire
    end
  end
ensure
  @selector.close
  @status = :finished
end

#sleep(routine, seconds) ⇒ Object

public


108
109
110
111
112
# File 'lib/goru/reactor.rb', line 108

def sleep(routine, seconds)
  @timers.after(seconds) {
    routine.wake
  }
end

#stopObject

public


98
99
100
101
102
103
104
# File 'lib/goru/reactor.rb', line 98

def stop
  @stopped = true

  unless @selector.closed?
    @selector.wakeup
  end
end