Class: Evt::Bundled

Inherits:
Object
  • Object
show all
Defined in:
lib/evt/backends/bundled.rb,
ext/evt/evt.c

Direct Known Subclasses

Epoll, Iocp, Kqueue, Select, Uring

Defined Under Namespace

Classes: Payload

Constant Summary collapse

MAXIMUM_TIMEOUT =
5000
COLLECT_COUNTER_MAX =
16384

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBundled

Returns a new instance of Bundled.



7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/evt/backends/bundled.rb', line 7

def initialize
  @readable = {}
  @writable = {}
  @waiting = {}
  @iovs = {}
  
  @lock = Mutex.new
  @blocking = 0
  @ready = []
  @collect_counter = 0

  init_selector
end

Instance Attribute Details

#readableObject (readonly)

Returns the value of attribute readable.



21
22
23
# File 'lib/evt/backends/bundled.rb', line 21

def readable
  @readable
end

#waitingObject (readonly)

Returns the value of attribute waiting.



23
24
25
# File 'lib/evt/backends/bundled.rb', line 23

def waiting
  @waiting
end

#writableObject (readonly)

Returns the value of attribute writable.



22
23
24
# File 'lib/evt/backends/bundled.rb', line 22

def writable
  @writable
end

Class Method Details

.epoll_backendObject

.kqueue_backendObject

.select_backendObject

.uring_backendObject

Instance Method Details

#block(blocker, timeout = nil) ⇒ Object

Block the calling fiber.



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/evt/backends/bundled.rb', line 120

def block(blocker, timeout = nil)
  if timeout
    @waiting[Fiber.current] = current_time + timeout
    begin
      Fiber.yield
    ensure
      @waiting.delete(Fiber.current)
    end
  else
    @blocking += 1
    begin
      Fiber.yield
    ensure
      @blocking -= 1
    end
  end
end

#closeObject

Invoked when the thread exits.



149
150
151
# File 'lib/evt/backends/bundled.rb', line 149

def close
  self.run
end

#collect(force = false) ⇒ Object

Collect closed streams in readables and writables



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/evt/backends/bundled.rb', line 154

def collect(force=false)
  if @collect_counter < COLLECT_COUNTER_MAX and !force
    @collect_counter += 1
    return
  end

  @collect_counter = 0

  @readable.keys.each do |io|
    @readable.delete(io) if io.closed?
  end
  
  @writable.keys.each do |io|
    @writable.delete(io) if io.closed?
  end

  @iovs.keys.each do |io|
    @iovs.delete(io) if io.closed?
  end
end

#current_timeObject



88
89
90
# File 'lib/evt/backends/bundled.rb', line 88

def current_time
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#epoll_deregisterObject

#epoll_init_selectorObject

#epoll_registerObject

#epoll_waitObject

#fiber(&block) ⇒ Object

Intercept the creation of a non-blocking fiber.



177
178
179
180
181
# File 'lib/evt/backends/bundled.rb', line 177

def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)
  fiber.resume
  fiber
end

#io_wait(io, events, duration) ⇒ Object

Wait for the given file descriptor to match the specified events within the specified timeout.



98
99
100
101
102
103
104
105
106
# File 'lib/evt/backends/bundled.rb', line 98

def io_wait(io, events, duration)
  # TODO: IO::PRIORITY
  @readable[io] = Fiber.current unless (events & IO::READABLE).zero?
  @writable[io] = Fiber.current unless (events & IO::WRITABLE).zero?
  self.register(io, events)
  Fiber.yield
  self.deregister(io)
  true
end

#kernel_sleep(duration = nil) ⇒ Object

Sleep the current task for the specified duration, or forever if not specified.

Parameters:

  • duration (Numeric) (defaults to: nil)

    The amount of time to sleep in seconds.



111
112
113
114
# File 'lib/evt/backends/bundled.rb', line 111

def kernel_sleep(duration = nil)
  self.block(:sleep, duration)
  true
end

#kqueue_init_selectorObject

#kqueue_registerObject

#kqueue_waitObject

#next_timeoutObject



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/evt/backends/bundled.rb', line 25

def next_timeout
  _fiber, timeout = @waiting.min_by{ |key, value| value }

  if timeout
    offset = (timeout - current_time) * 1000 # Use mililisecond
    return 0 if offset < 0
    return offset if offset < MAXIMUM_TIMEOUT
  end

  MAXIMUM_TIMEOUT
end

#runObject



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/evt/backends/bundled.rb', line 37

def run
  while @readable.any? or @writable.any? or @waiting.any? or @iovs.any? or @blocking.positive?
    readable, writable, iovs = self.wait

    readable&.each do |io|
      fiber = @readable.delete(io)
      fiber&.resume
    end

    writable&.each do |io|
      fiber = @writable.delete(io)
      fiber&.resume
    end

    unless iovs.nil?
      iovs&.each do |v|
        io, ret = v
        fiber = @iovs.delete(io)
        fiber&.resume(ret)
      end
    end

    collect

    if @waiting.any?
      time = current_time
      waiting, @waiting = @waiting, {}

      waiting.each do |fiber, timeout|
        if timeout <= time
          fiber.resume if fiber.is_a? Fiber and fiber.alive?
        else
          @waiting[fiber] = timeout
        end
      end
    end

    if @ready.any?
      ready = nil

      @lock.synchronize do
        ready, @ready = @ready, []
      end

      ready.each do |fiber|
        fiber.resume if fiber.is_a? Fiber and fiber.alive?
      end
    end
  end
end

#select_waitObject

#unblock(blocker, fiber) ⇒ Object

Unblock the specified fiber.



142
143
144
145
146
# File 'lib/evt/backends/bundled.rb', line 142

def unblock(blocker, fiber)
  @lock.synchronize do
    @ready << fiber
  end
end

#uring_init_selectorObject

#uring_io_readObject

#uring_io_writeObject

#uring_registerObject

#uring_waitObject