Module: Rbgo::CoRun
- Defined in:
- lib/rbgo/corun.rb
Defined Under Namespace
Constant Summary collapse
- IS_CORUN_FIBER =
:is_corun_fiber_bbc0f70e
- LOCAL_TASK_QUEUES =
:local_task_queues_bbc0f70e
- YIELD_IO_OPERATION =
:yield_bbc0f70e
Class Method Summary collapse
- .accept_from(sock) ⇒ Object
- .connect_to(sock, remote_sockaddr:) ⇒ Object
- .have_other_task_on_thread? ⇒ Boolean
- .is_in_corun_fiber? ⇒ Boolean
- .read_from(io, length: nil) ⇒ Object
- .read_line_from(io, sep: $/, limit: nil) ⇒ Object
- .read_partial_from(io, maxlen:) ⇒ Object
- .recv_from(sock, maxlen:, flags: 0) ⇒ Object
- .recvmsg_from(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {}) ⇒ Object
- .sendmsg_to(sock, mesg, flags: 0, dest_sockaddr: nil, controls: []) ⇒ Object
- .write_to(io, str:) ⇒ Object
- .yield_io(&blk) ⇒ Object
Class Method Details
.accept_from(sock) ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/rbgo/corun.rb', line 72 def self.accept_from(sock) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_accept(sock) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.accept end end |
.connect_to(sock, remote_sockaddr:) ⇒ Object
81 82 83 84 85 86 87 88 |
# File 'lib/rbgo/corun.rb', line 81 def self.connect_to(sock, remote_sockaddr:) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_connect(sock, remote_sockaddr: remote_sockaddr) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.connect(remote_sockaddr) end end |
.have_other_task_on_thread? ⇒ Boolean
20 21 22 23 |
# File 'lib/rbgo/corun.rb', line 20 def self.have_other_task_on_thread? queues = Thread.current.thread_variable_get(LOCAL_TASK_QUEUES) queues&.any? { |q| !q.empty? } end |
.is_in_corun_fiber? ⇒ Boolean
16 17 18 |
# File 'lib/rbgo/corun.rb', line 16 def self.is_in_corun_fiber? !!Thread.current[IS_CORUN_FIBER] end |
.read_from(io, length: nil) ⇒ Object
42 43 44 45 46 47 48 49 50 |
# File 'lib/rbgo/corun.rb', line 42 def self.read_from(io, length: nil) if is_in_corun_fiber? return "" if length == 0 receipt = Scheduler.instance.io_machine.do_read(io, length: length) Fiber.yield [YIELD_IO_OPERATION, receipt] else io.read(length) end end |
.read_line_from(io, sep: $/, limit: nil) ⇒ Object
52 53 54 55 56 57 58 59 60 |
# File 'lib/rbgo/corun.rb', line 52 def self.read_line_from(io, sep: $/, limit: nil) if is_in_corun_fiber? return "" if limit == 0 receipt = Scheduler.instance.io_machine.do_read_line(io, sep: sep, limit: limit) Fiber.yield [YIELD_IO_OPERATION, receipt] else io.readline(sep, limit) end end |
.read_partial_from(io, maxlen:) ⇒ Object
62 63 64 65 66 67 68 69 70 |
# File 'lib/rbgo/corun.rb', line 62 def self.read_partial_from(io, maxlen:) if is_in_corun_fiber? return "" if maxlen == 0 receipt = Scheduler.instance.io_machine.do_read_partial(io, maxlen: maxlen) Fiber.yield [YIELD_IO_OPERATION, receipt] else io.readpartial(maxlen) end end |
.recv_from(sock, maxlen:, flags: 0) ⇒ Object
90 91 92 93 94 95 96 97 |
# File 'lib/rbgo/corun.rb', line 90 def self.recv_from(sock, maxlen:, flags: 0) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_recv(sock, maxlen: maxlen, flags: flags) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.recv(maxlen, flags) end end |
.recvmsg_from(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {}) ⇒ Object
99 100 101 102 103 104 105 106 |
# File 'lib/rbgo/corun.rb', line 99 def self.recvmsg_from(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {}) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_recvmsg(sock, maxdatalen: maxdatalen, flags: flags, maxcontrollen: maxcontrollen, opts: opts) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.recvmsg(maxdatalen, flags, maxcontrollen, opts) end end |
.sendmsg_to(sock, mesg, flags: 0, dest_sockaddr: nil, controls: []) ⇒ Object
108 109 110 111 112 113 114 115 |
# File 'lib/rbgo/corun.rb', line 108 def self.sendmsg_to(sock, mesg, flags: 0, dest_sockaddr: nil, controls: []) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_sendmsg(sock, mesg, flags: flags, dest_sockaddr: dest_sockaddr, controls: controls) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.sendmsg(mesg, flags, dest_sockaddr, *controls) end end |
.write_to(io, str:) ⇒ Object
117 118 119 120 121 122 123 124 |
# File 'lib/rbgo/corun.rb', line 117 def self.write_to(io, str:) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_write(io, str: str) Fiber.yield [YIELD_IO_OPERATION, receipt] else io.write(str) end end |
.yield_io(&blk) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/rbgo/corun.rb', line 25 def self.yield_io(&blk) if is_in_corun_fiber? receipt = IOReceipt.new([:yield_io]) CoRun::Routine.new(new_thread: true, queue_tag: :none) do begin res = blk&.call receipt.res = res ensure receipt.notify end end Fiber.yield [YIELD_IO_OPERATION, receipt] else blk&.call end end |