Class: Bran::LibUV::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/bran/libuv/reactor.rb,
lib/bran/libuv/reactor/collections.rb

Defined Under Namespace

Modules: Collections Classes: DestroyedError

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/bran/libuv/reactor.rb', line 11

def initialize
  @ptr = FFI.uv_loop_alloc
  Util.error_check "creating the loop",
    FFI.uv_loop_init(@ptr)
  
  @finalizer = self.class.create_finalizer_for(@ptr)
  ObjectSpace.define_finalizer(self, @finalizer)
  
  @read_polls  = Collections::Read.new(self)
  @write_polls = Collections::Write.new(self)
  @write_polls.bond_with @read_polls
  
  @signal_polls = Collections::Signal.new(self)
  @timer_polls  = Collections::Timer.new(self)
  
  # TODO: add more Ruby-compatible signal handlers by default?
  push_signalable :INT, Proc.new { @abort_signal = :INT; stop! }
end

Class Method Details

.create_finalizer_for(ptr) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



52
53
54
55
56
57
58
# File 'lib/bran/libuv/reactor.rb', line 52

def self.create_finalizer_for(ptr)
  Proc.new do
    FFI.uv_loop_close(ptr)
    # TODO: prevent running finalizer when loop hasn't been stopped?
    ptr.free
  end
end

Instance Method Details

#destroyObject

Free the native resources associated with this object. This will be done automatically on garbage collection if not called explicitly.



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/bran/libuv/reactor.rb', line 32

def destroy
  if @finalizer
    @finalizer.call
    ObjectSpace.undefine_finalizer(self)
  end
  @ptr = @finalizer = nil
  
  @read_polls   = @write_polls = \
  @signal_polls = @timer_polls = nil
  
  self
end

#pop_readable(fd) ⇒ Object

Remove the next readable handler for the given fd.



117
118
119
# File 'lib/bran/libuv/reactor.rb', line 117

def pop_readable(fd)
  @read_polls.pop(Integer(fd))
end

#pop_signalable(signo) ⇒ Object

Remove the next signal handler for the given signal.



127
128
129
130
131
# File 'lib/bran/libuv/reactor.rb', line 127

def pop_signalable(signo)
  signo = Signal.list.fetch(signo.to_s) unless signo.is_a?(Integer)
  
  @signal_polls.pop(signo)
end

#pop_timable(ident) ⇒ Object

Remove the next timer handler for the given timer.



134
135
136
# File 'lib/bran/libuv/reactor.rb', line 134

def pop_timable(ident)
  @timer_polls.pop(ident)
end

#pop_writable(fd) ⇒ Object

Remove the next writable handler for the given fd.



122
123
124
# File 'lib/bran/libuv/reactor.rb', line 122

def pop_writable(fd)
  @write_polls.pop(Integer(fd))
end

#ptrObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raises:



46
47
48
49
# File 'lib/bran/libuv/reactor.rb', line 46

def ptr
  raise DestroyedError unless @ptr
  @ptr
end

#push_readable(fd, handler, persistent = true) ⇒ Object

Push the given handler for the given fd, adding if necessary. If persistent is false, the handler will be popped after one trigger.



92
93
94
# File 'lib/bran/libuv/reactor.rb', line 92

def push_readable(fd, handler, persistent = true)
  @read_polls.push(Integer(fd), handler, persistent)
end

#push_signalable(signo, handler, persistent = true) ⇒ Object

Push the given handler for the given signo, adding if necessary. If persistent is false, the handler will be popped after one trigger.



104
105
106
107
108
# File 'lib/bran/libuv/reactor.rb', line 104

def push_signalable(signo, handler, persistent = true)
  signo = Signal.list.fetch(signo.to_s) unless signo.is_a?(Integer)
  
  @signal_polls.push(signo, handler, persistent)
end

#push_timable(ident, timeout, handler, persistent = true) ⇒ Object

Push the given handler for the given timer id, adding if necessary. If persistent is false, the handler will be popped after one trigger.



112
113
114
# File 'lib/bran/libuv/reactor.rb', line 112

def push_timable(ident, timeout, handler, persistent = true)
  @timer_polls.push(ident, handler, persistent, timeout)
end

#push_writable(fd, handler, persistent = true) ⇒ Object

Push the given handler for the given fd, adding if necessary. If persistent is false, the handler will be popped after one trigger.



98
99
100
# File 'lib/bran/libuv/reactor.rb', line 98

def push_writable(fd, handler, persistent = true)
  @write_polls.push(Integer(fd), handler, persistent)
end

#rescue_abortObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Capture exceptions raised from callbacks, stopping the loop, capturing the exception to be re-raised outside the loop in #run!.



63
64
65
66
67
68
# File 'lib/bran/libuv/reactor.rb', line 63

def rescue_abort
  yield
rescue Exception => ex
  @abort_exception = ex
  stop!
end

#run!Object

Run the libuv event loop in default (blocking) mode, running until stopped or until all handles are removed.

Raises:

  • (@abort_exception)


72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/bran/libuv/reactor.rb', line 72

def run!
  @abort_exception = nil
  @abort_signal = nil
  
  rc = FFI.uv_run(ptr, :default)
  
  # If an exception or signal caused the stop, re-raise it here.
  raise @abort_exception if @abort_exception
  Process.kill(@abort_signal, Process.pid) if @abort_signal
  
  Util.error_check "running the loop in blocking mode", rc
end

#stop!Object

Return true if there are active handles or request in the loop.



86
87
88
# File 'lib/bran/libuv/reactor.rb', line 86

def stop!
  FFI.uv_stop(ptr)
end

#timer_oneshot(time, &block) ⇒ Object

Start a timer to run the given block after the given timeout. The timer will be run just once, starting now.



140
141
142
# File 'lib/bran/libuv/reactor.rb', line 140

def timer_oneshot(time, &block)
  push_timable(block.object_id, time, block, false)
end

#timer_oneshot_wake(time, fiber) ⇒ Object

Start a timer to wake the given fiber after the given timeout. The timer will be run just once, starting now.



146
147
148
# File 'lib/bran/libuv/reactor.rb', line 146

def timer_oneshot_wake(time, fiber)
  timer_oneshot(time) { fiber.resume } # TODO: optimize this case, but be careful of ident uniqueness
end