Module: Rbgo::CoRun

Defined in:
lib/rbgo/corun.rb

Defined Under Namespace

Classes: Routine, Scheduler

Constant Summary collapse

IS_CORUN_FIBER =
:is_corun_fiber_bbc0f70e
LOCAL_TASK_QUEUES =
:local_task_queues_bbc0f70e
YIELD_IO_OPERATION =
:yield_bbc0f70e

Class Method Summary collapse

Class Method Details

.accept_from(sock) ⇒ Object



53
54
55
56
57
58
59
60
# File 'lib/rbgo/corun.rb', line 53

def self.accept_from(sock)
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_accept(sock)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.accept
  end
end

.connect_to(sock, remote_sockaddr:) ⇒ Object



62
63
64
65
66
67
68
69
# File 'lib/rbgo/corun.rb', line 62

def self.connect_to(sock, remote_sockaddr:)
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_connect(sock, remote_sockaddr: remote_sockaddr)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.connect(remote_sockaddr)
  end
end

.have_other_task_on_thread?Boolean

Returns:

  • (Boolean)


18
19
20
21
# File 'lib/rbgo/corun.rb', line 18

def self.have_other_task_on_thread?
  queues = Thread.current.thread_variable_get(LOCAL_TASK_QUEUES)
  queues&.any?{|q| !q.empty?}
end

.is_in_corun_fiber?Boolean

Returns:

  • (Boolean)


14
15
16
# File 'lib/rbgo/corun.rb', line 14

def self.is_in_corun_fiber?
  !!Thread.current[IS_CORUN_FIBER]
end

.read_from(io, length: nil) ⇒ Object



23
24
25
26
27
28
29
30
31
# File 'lib/rbgo/corun.rb', line 23

def self.read_from(io, length: nil)
  if is_in_corun_fiber?
    return "" if length == 0
    receipt = Scheduler.instance.io_machine.do_read(io, length: length)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    io.read(length)
  end
end

.read_line_from(io, sep: $/, limit: nil) ⇒ Object



33
34
35
36
37
38
39
40
41
# File 'lib/rbgo/corun.rb', line 33

def self.read_line_from(io, sep: $/, limit: nil)
  if is_in_corun_fiber?
    return "" if limit == 0
    receipt = Scheduler.instance.io_machine.do_read_line(io, sep: sep, limit: limit)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    io.readline(sep, limit)
  end
end

.read_partial_from(io, maxlen:) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/rbgo/corun.rb', line 43

def self.read_partial_from(io, maxlen:)
  if is_in_corun_fiber?
    return "" if maxlen == 0
    receipt = Scheduler.instance.io_machine.do_read_partial(io, maxlen: maxlen)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    io.readpartial(maxlen)
  end
end

.recv_from(sock, maxlen:, flags: 0) ⇒ Object



71
72
73
74
75
76
77
78
# File 'lib/rbgo/corun.rb', line 71

def self.recv_from(sock, maxlen:, flags: 0)
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_recv(sock, maxlen: maxlen, flags: flags)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.recv(maxlen, flags)
  end
end

.recvmsg_from(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {}) ⇒ Object



80
81
82
83
84
85
86
87
# File 'lib/rbgo/corun.rb', line 80

def self.recvmsg_from(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {})
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_recvmsg(sock, maxdatalen: maxdatalen, flags: flags, maxcontrollen: maxcontrollen, opts: opts)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.recvmsg(maxdatalen, flags, maxcontrollen, opts)
  end
end

.sendmsg_to(sock, mesg, flags: 0, dest_sockaddr: nil, controls: []) ⇒ Object



89
90
91
92
93
94
95
96
# File 'lib/rbgo/corun.rb', line 89

def self.sendmsg_to(sock, mesg, flags: 0, dest_sockaddr: nil, controls: [])
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_socket_sendmsg(sock, mesg, flags: flags, dest_sockaddr: dest_sockaddr, controls: controls)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    sock.sendmsg(mesg, flags, dest_sockaddr, *controls)
  end
end

.write_to(io, str:) ⇒ Object



98
99
100
101
102
103
104
105
# File 'lib/rbgo/corun.rb', line 98

def self.write_to(io, str:)
  if is_in_corun_fiber?
    receipt = Scheduler.instance.io_machine.do_write(io, str: str)
    Fiber.yield [YIELD_IO_OPERATION, receipt]
  else
    io.write(str)
  end
end