Class: Rex::IO::FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/rex/io/fiber_scheduler.rb

Overview

This fiber scheduler is heavily base on blog.monotone.dev/ruby/2020/12/25/ruby-3-fiber.html

Instance Method Summary collapse

Constructor Details

#initializeFiberScheduler

Returns a new instance of FiberScheduler.



11
12
13
14
15
16
17
18
19
20
# File 'lib/rex/io/fiber_scheduler.rb', line 11

def initialize
  @readable = {}
  @writable = {}
  @waiting = {}
  @ready = []
  @pending = []
  @blocking = 0
  @urgent = Rex::Compat.pipe
  @mutex = Mutex.new
end

Instance Method Details

#block(blocker, timeout = nil) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/rex/io/fiber_scheduler.rb', line 102

def block(blocker, timeout = nil)
  if timeout
    @waiting[Fiber.current] = current_time + timeout
    begin
      Fiber.yield
    ensure
      @waiting.delete(Fiber.current)
    end
  else
    @blocking += 1
    begin
      Fiber.yield
    ensure
      @blocking -= 1
    end
  end
end

#closeObject



126
127
128
129
130
# File 'lib/rex/io/fiber_scheduler.rb', line 126

def close
  run
  @urgent.each(&:close)
  @urgent = nil
end

#closed?Boolean

Returns:

  • (Boolean)


132
133
134
# File 'lib/rex/io/fiber_scheduler.rb', line 132

def closed?
  @urgent.nil?
end

#fiber(&block) ⇒ Object



136
137
138
139
140
# File 'lib/rex/io/fiber_scheduler.rb', line 136

def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)
  fiber.resume
  fiber
end

#io_wait(io, events, timeout) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rex/io/fiber_scheduler.rb', line 85

def io_wait(io, events, timeout)
  unless (events & ::IO::READABLE).zero?
    @readable[io] = Fiber.current
  end
  unless (events & ::IO::WRITABLE).zero?
    @writable[io] = Fiber.current
  end

  Fiber.yield
  events
end

#kernel_sleep(duration = nil) ⇒ Object



97
98
99
100
# File 'lib/rex/io/fiber_scheduler.rb', line 97

def kernel_sleep(duration = nil)
  block(:sleep, duration)
  true
end

#runObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/rex/io/fiber_scheduler.rb', line 31

def run
  while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? or @ready.any? or @pending.any?
    # Start any pending fibers
    pending_blocks = []
    @mutex.synchronize do
      pending_blocks = @pending.dup
      @pending.clear
    end

    pending_blocks.each do |block|
      fiber = Fiber.new(blocking: false, &block)
      fiber.resume
    end

    begin
      readable, writable = ::IO.select(@readable.keys + [@urgent.first], @writable.keys, [], 0.1)
    rescue ::IOError
      cleanup_closed_ios
      next
    end

    # Drain the urgent pipe
    if readable&.include?(@urgent.first)
      @urgent.first.read_nonblock(1024) rescue nil
    end

    readable&.each do |io|
      next if io == @urgent.first

      if fiber = @readable.delete(io)
        fiber.resume
      end
    end

    writable&.each do |io|
      if fiber = @writable.delete(io)
        fiber.resume
      end
    end

    @waiting.keys.each do |fiber|
      if current_time > @waiting[fiber]
        @waiting.delete(fiber)
        fiber.resume
      end
    end

    ready, @ready = @ready, []
    ready.each do |fiber|
      fiber.resume
    end
  end
end

#schedule_fiber(&block) ⇒ Object

This allows the fiber to be scheduled in the #run thread from another thread



23
24
25
26
27
28
29
# File 'lib/rex/io/fiber_scheduler.rb', line 23

def schedule_fiber(&block)
  @mutex.synchronize do
    @pending << block
  end
  # Wake up the scheduler
  @urgent.last.write_nonblock('.') rescue nil
end

#unblock(blocker, fiber) ⇒ Object



120
121
122
123
124
# File 'lib/rex/io/fiber_scheduler.rb', line 120

def unblock(blocker, fiber)
  @ready << fiber
  io = @urgent.last
  io.write_nonblock('.')
end