Module: Libuv::Stream

Included in:
Pipe, TCP, TTY
Defined in:
lib/libuv/mixins/stream.rb

Constant Summary collapse

BACKLOG_ERROR =
"backlog must be an Integer".freeze
WRITE_ERROR =
"data must be a String".freeze
STREAM_CLOSED_ERROR =
"unable to write to a closed stream".freeze
CLOSED_HANDLE_ERROR =
"handle closed before accept called".freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



5
6
7
8
9
10
11
12
# File 'lib/libuv/mixins/stream.rb', line 5

def self.included(base)
    base.define_callback function: :on_listen,      params: [:pointer, :int]
    base.define_callback function: :write_complete, params: [:pointer, :int]
    base.define_callback function: :on_shutdown,    params: [:pointer, :int]

    base.define_callback function: :on_allocate, params: [:pointer, :size_t, Ext::UvBuf.by_ref]
    base.define_callback function: :on_read,     params: [:pointer, :ssize_t, Ext::UvBuf.by_ref]
end

Instance Method Details

#listen(backlog) ⇒ Object



22
23
24
25
26
27
# File 'lib/libuv/mixins/stream.rb', line 22

def listen(backlog)
    return if @closed
    assert_type(Integer, backlog, BACKLOG_ERROR)
    error = check_result ::Libuv::Ext.listen(handle, Integer(backlog), callback(:on_listen))
    reject(error) if error
end

#progress(callback = nil, &blk) ⇒ Object



110
111
112
# File 'lib/libuv/mixins/stream.rb', line 110

def progress(callback = nil, &blk)
    @progress = callback || blk
end

#readable?Boolean

Returns:

  • (Boolean)


100
101
102
103
# File 'lib/libuv/mixins/stream.rb', line 100

def readable?
    return false if @closed
    ::Libuv::Ext.is_readable(handle) > 0
end

#shutdownObject

Shutsdown the writes on the handle waiting until the last write is complete before triggering the callback



44
45
46
47
48
# File 'lib/libuv/mixins/stream.rb', line 44

def shutdown
    return if @closed
    error = check_result ::Libuv::Ext.shutdown(::Libuv::Ext.allocate_request_shutdown, handle, callback(:on_shutdown))
    reject(error) if error
end

#start_readObject

Starts reading from the handle



30
31
32
33
34
# File 'lib/libuv/mixins/stream.rb', line 30

def start_read
    return if @closed
    error = check_result ::Libuv::Ext.read_start(handle, callback(:on_allocate), callback(:on_read))
    reject(error) if error
end

#stop_readObject

Stops reading from the handle



37
38
39
40
41
# File 'lib/libuv/mixins/stream.rb', line 37

def stop_read
    return if @closed
    error = check_result ::Libuv::Ext.read_stop(handle)
    reject(error) if error
end

#try_write(data) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/libuv/mixins/stream.rb', line 50

def try_write(data)
    assert_type(String, data, WRITE_ERROR)

    buffer1 = ::FFI::MemoryPointer.from_string(data)
    buffer  = ::Libuv::Ext.buf_init(buffer1, data.respond_to?(:bytesize) ? data.bytesize : data.size)

    result = ::Libuv::Ext.try_write(handle, buffer, 1)
    buffer1.free

    error = check_result result
    raise error if error
    return result
end

#writable?Boolean

Returns:

  • (Boolean)


105
106
107
108
# File 'lib/libuv/mixins/stream.rb', line 105

def writable?
    return false if @closed
    ::Libuv::Ext.is_writable(handle) > 0
end

#write(data) ⇒ Object Also known as: puts



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/libuv/mixins/stream.rb', line 64

def write(data)
    # NOTE:: Similar to udp.rb -> send
    deferred = @loop.defer
    if !@closed
        begin
            assert_type(String, data, WRITE_ERROR)

            buffer1 = ::FFI::MemoryPointer.from_string(data)
            buffer  = ::Libuv::Ext.buf_init(buffer1, data.respond_to?(:bytesize) ? data.bytesize : data.size)

            # local as this variable will be available until the handle is closed
            @write_callbacks ||= {}
            req = ::Libuv::Ext.allocate_request_write
            @write_callbacks[req.address] = [deferred, buffer1]
            error = check_result ::Libuv::Ext.write(req, handle, buffer, 1, callback(:write_complete, req.address))

            if error
                @write_callbacks.delete req.address
                cleanup_callbacks req.address

                ::Libuv::Ext.free(req)
                buffer1.free
                deferred.reject(error)

                reject(error)       # close the handle
            end
        rescue => e
            deferred.reject(e)  # this write exception may not be fatal
        end
    else
        deferred.reject(RuntimeError.new(STREAM_CLOSED_ERROR))
    end
    deferred.promise
end