Module: Libuv::Stream
Constant Summary collapse
- BACKLOG_ERROR =
"backlog must be an Integer"
- WRITE_ERROR =
"data must be a String"
- STREAM_CLOSED_ERROR =
"unable to write to a closed stream"
- CLOSED_HANDLE_ERROR =
"handle closed before accept called"
Class Method Summary collapse
Instance Method Summary collapse
-
#close_write ⇒ Object
These are here purely for compatibility with rack hijack IO.
- #flush ⇒ Object
- #listen(backlog) ⇒ Object
- #progress(callback = nil, &blk) ⇒ Object
-
#read(maxlen = nil, outbuf = nil) ⇒ Object
(also: #read_nonblock)
Very basic IO emulation, in no way trying to be exact.
- #readable? ⇒ Boolean
-
#shutdown ⇒ Object
Shutsdown the writes on the handle waiting until the last write is complete before triggering the callback.
-
#start_read ⇒ Object
Starts reading from the handle.
-
#stop_read ⇒ Object
(also: #close_read)
Stops reading from the handle.
- #try_write(data) ⇒ Object
- #writable? ⇒ Boolean
- #write(data, wait: false) ⇒ Object (also: #puts, #write_nonblock)
Class Method Details
.included(base) ⇒ Object
7 8 9 10 11 12 13 14 |
# File 'lib/libuv/mixins/stream.rb', line 7 def self.included(base) base.define_callback function: :on_listen, params: [:pointer, :int] base.define_callback function: :write_complete, params: [:pointer, :int] base.define_callback function: :on_shutdown, params: [:pointer, :int] base.define_callback function: :on_allocate, params: [:pointer, :size_t, Ext::UvBuf.by_ref] base.define_callback function: :on_read, params: [:pointer, :ssize_t, Ext::UvBuf.by_ref] end |
Instance Method Details
#close_write ⇒ Object
These are here purely for compatibility with rack hijack IO
155 |
# File 'lib/libuv/mixins/stream.rb', line 155 def close_write; end |
#flush ⇒ Object
156 157 158 159 160 161 162 |
# File 'lib/libuv/mixins/stream.rb', line 156 def flush raise ::EOFError.new('socket closed') if @closed @flush_defer = @reactor.defer check_flush_buffer co @flush_defer.promise end |
#listen(backlog) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/libuv/mixins/stream.rb', line 24 def listen(backlog) return self if @closed assert_type(Integer, backlog, BACKLOG_ERROR) error = check_result ::Libuv::Ext.listen(handle, Integer(backlog), callback(:on_listen)) reject(error) if error self end |
#progress(callback = nil, &blk) ⇒ Object
125 126 127 128 |
# File 'lib/libuv/mixins/stream.rb', line 125 def progress(callback = nil, &blk) @progress = callback || blk self end |
#read(maxlen = nil, outbuf = nil) ⇒ Object Also known as: read_nonblock
Very basic IO emulation, in no way trying to be exact
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/libuv/mixins/stream.rb', line 131 def read(maxlen = nil, outbuf = nil) raise ::EOFError.new('socket closed') if @closed @read_defer = @reactor.defer if @read_buffer.nil? start_read @read_buffer = String.new self.finally do @read_defer.reject(::EOFError.new('socket closed')) end end if check_read_buffer(maxlen, outbuf, @read_defer) progress do |data| @read_buffer << data check_read_buffer(maxlen, outbuf, @read_defer) end end co @read_defer.promise end |
#readable? ⇒ Boolean
115 116 117 118 |
# File 'lib/libuv/mixins/stream.rb', line 115 def readable? return false if @closed ::Libuv::Ext.is_readable(handle) > 0 end |
#shutdown ⇒ Object
Shutsdown the writes on the handle waiting until the last write is complete before triggering the callback
50 51 52 53 54 55 56 |
# File 'lib/libuv/mixins/stream.rb', line 50 def shutdown return self if @closed req = ::Libuv::Ext.allocate_request_shutdown error = check_result ::Libuv::Ext.shutdown(req, handle, callback(:on_shutdown, req.address)) reject(error) if error self end |
#start_read ⇒ Object
Starts reading from the handle
33 34 35 36 37 38 |
# File 'lib/libuv/mixins/stream.rb', line 33 def start_read return self if @closed error = check_result ::Libuv::Ext.read_start(handle, callback(:on_allocate), callback(:on_read)) reject(error) if error self end |
#stop_read ⇒ Object Also known as: close_read
Stops reading from the handle
41 42 43 44 45 46 |
# File 'lib/libuv/mixins/stream.rb', line 41 def stop_read return self if @closed error = check_result ::Libuv::Ext.read_stop(handle) reject(error) if error self end |
#try_write(data) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/libuv/mixins/stream.rb', line 58 def try_write(data) assert_type(String, data, WRITE_ERROR) buffer1 = ::FFI::MemoryPointer.from_string(data) buffer = ::Libuv::Ext.buf_init(buffer1, data.respond_to?(:bytesize) ? data.bytesize : data.size) result = ::Libuv::Ext.try_write(handle, buffer, 1) buffer1.free error = check_result result raise error if error return result end |
#writable? ⇒ Boolean
120 121 122 123 |
# File 'lib/libuv/mixins/stream.rb', line 120 def writable? return false if @closed ::Libuv::Ext.is_writable(handle) > 0 end |
#write(data, wait: false) ⇒ Object Also known as: puts, write_nonblock
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/libuv/mixins/stream.rb', line 72 def write(data, wait: false) # NOTE:: Similar to udp.rb -> send deferred = @reactor.defer if !@closed begin assert_type(String, data, WRITE_ERROR) buffer1 = ::FFI::MemoryPointer.from_string(data) buffer = ::Libuv::Ext.buf_init(buffer1, data.bytesize) # local as this variable will be available until the handle is closed @write_callbacks ||= {} req = ::Libuv::Ext.allocate_request_write @write_callbacks[req.address] = [deferred, buffer1] error = check_result ::Libuv::Ext.write(req, handle, buffer, 1, callback(:write_complete, req.address)) if error @write_callbacks.delete req.address cleanup_callbacks req.address ::Libuv::Ext.free(req) buffer1.free deferred.reject(error) reject(error) # close the handle end rescue => e deferred.reject(e) # this write exception may not be fatal end else deferred.reject(RuntimeError.new(STREAM_CLOSED_ERROR)) end if wait return deferred.promise if wait == :promise co deferred.promise end self end |