Module: Rbgo::CoRun

Defined in:
lib/rbgo/corun.rb

Defined Under Namespace

Classes: Routine, Scheduler

Constant Summary collapse

IS_CORUN_FIBER =
:is_corun_fiber_bbc0f70e
LOCAL_TASK_QUEUES =
:local_task_queues_bbc0f70e
YIELD_IO_OPERATION =
:yield_bbc0f70e

Class Method Summary collapse

Class Method Details

.accept_from(sock) ⇒ Object



72
73
74
75
76
77
78
79
# File 'lib/rbgo/corun.rb', line 72

def self.accept_from(sock)
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_accept(sock)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.accept
  end
end

.connect_to(sock, remote_sockaddr:) ⇒ Object



81
82
83
84
85
86
87
88
# File 'lib/rbgo/corun.rb', line 81

def self.connect_to(sock, remote_sockaddr:)
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_connect(sock, remote_sockaddr: remote_sockaddr)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.connect(remote_sockaddr)
  end
end

.have_other_task_on_thread?Boolean

Returns:

  • (Boolean)


20
21
22
23
# File 'lib/rbgo/corun.rb', line 20

def self.have_other_task_on_thread?
  queues = Thread.current.thread_variable_get(LOCAL_TASK_QUEUES)
  queues&.any? { |q| !q.empty? }
end

.is_in_corun_fiber?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/rbgo/corun.rb', line 16

def self.is_in_corun_fiber?
  !!Thread.current[IS_CORUN_FIBER]
end

.read_from(io, length: nil) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/rbgo/corun.rb', line 42

def self.read_from(io, length: nil)
  if is_in_corun_fiber?
    return "" if length == 0
    receipt = Scheduler.instance.io_machine.do_read(io, length: length)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    io.read(length)
  end
end

.read_line_from(io, sep: $/, limit: nil) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'lib/rbgo/corun.rb', line 52

def self.read_line_from(io, sep: $/, limit: nil)
  if is_in_corun_fiber?
    return "" if limit == 0
    receipt = Scheduler.instance.io_machine.do_read_line(io, sep: sep, limit: limit)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    io.readline(sep, limit)
  end
end

.read_partial_from(io, maxlen:) ⇒ Object



62
63
64
65
66
67
68
69
70
# File 'lib/rbgo/corun.rb', line 62

def self.read_partial_from(io, maxlen:)
  if is_in_corun_fiber?
    return "" if maxlen == 0
    receipt = Scheduler.instance.io_machine.do_read_partial(io, maxlen: maxlen)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    io.readpartial(maxlen)
  end
end

.recv_from(sock, maxlen:, flags: 0) ⇒ Object



90
91
92
93
94
95
96
97
# File 'lib/rbgo/corun.rb', line 90

def self.recv_from(sock, maxlen:, flags: 0)
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_recv(sock, maxlen: maxlen, flags: flags)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.recv(maxlen, flags)
  end
end

.recvmsg_from(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {}) ⇒ Object



99
100
101
102
103
104
105
106
# File 'lib/rbgo/corun.rb', line 99

def self.recvmsg_from(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {})
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_recvmsg(sock, maxdatalen: maxdatalen, flags: flags, maxcontrollen: maxcontrollen, opts: opts)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.recvmsg(maxdatalen, flags, maxcontrollen, opts)
  end
end

.sendmsg_to(sock, mesg, flags: 0, dest_sockaddr: nil, controls: []) ⇒ Object



108
109
110
111
112
113
114
115
# File 'lib/rbgo/corun.rb', line 108

def self.sendmsg_to(sock, mesg, flags: 0, dest_sockaddr: nil, controls: [])
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_sendmsg(sock, mesg, flags: flags, dest_sockaddr: dest_sockaddr, controls: controls)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.sendmsg(mesg, flags, dest_sockaddr, *controls)
  end
end

.write_to(io, str:) ⇒ Object



117
118
119
120
121
122
123
124
# File 'lib/rbgo/corun.rb', line 117

def self.write_to(io, str:)
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_write(io, str: str)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    io.write(str)
  end
end

.yield_io(&blk) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rbgo/corun.rb', line 25

def self.yield_io(&blk)
  if is_in_corun_fiber?
    receipt = IOReceipt.new([:yield_io])
    CoRun::Routine.new(new_thread: true, queue_tag: :none) do
      begin
        res = blk&.call
        receipt.res = res
      ensure
        receipt.notify
      end
    end
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    blk&.call
  end
end