Class: Libuv::Pipe
Constant Summary
collapse
- WRITE2_ERROR =
"data must be a String".freeze
Constants included
from Stream
Stream::BACKLOG_ERROR, Stream::CLOSED_HANDLE_ERROR, Stream::STREAM_CLOSED_ERROR, Stream::WRITE_ERROR
Constants included
from Assertions
Assertions::MSG_NO_PROC
Constants inherited
from Q::Promise
Q::Promise::MAKE_PROMISE
Instance Attribute Summary
Attributes inherited from Handle
#closed, #loop, #storage
Instance Method Summary
collapse
-
#accept(callback = nil, &blk) ⇒ Object
-
#bind(name, callback = nil, &blk) ⇒ Object
-
#check_pending(expecting = nil) ⇒ Object
-
#connect(name, callback = nil, &blk) ⇒ Object
-
#getsockname ⇒ Object
-
#initialize(loop, ipc, acceptor = nil) ⇒ Pipe
constructor
-
#open(fileno, callback = nil, &blk) ⇒ Object
-
#pending_instances=(count) ⇒ Object
-
#write2(fd, data = ".") ⇒ Object
Methods included from Stream
#listen, #progress, #readable?, #shutdown, #start_read, #stop_read, #try_write, #writable?, #write
Methods inherited from Handle
#active?, #close, #closing?, #ref, #unref
Methods included from Assertions
#assert_block, #assert_boolean, #assert_type
Methods included from Resource
#check_result, #check_result!, #resolve, #to_ptr
#resolved?, #then
Methods inherited from Q::Promise
#catch, #finally, #progress
Constructor Details
#initialize(loop, ipc, acceptor = nil) ⇒ Pipe
9
10
11
12
13
14
15
16
17
|
# File 'lib/libuv/pipe.rb', line 9
def initialize(loop, ipc, acceptor = nil)
@loop, @ipc = loop, ipc
pipe_ptr = ::Libuv::Ext.create_handle(:uv_pipe)
error = check_result(::Libuv::Ext.pipe_init(loop.handle, pipe_ptr, ipc ? 1 : 0))
error = check_result(::Libuv::Ext.accept(acceptor, pipe_ptr)) if acceptor && error.nil?
super(pipe_ptr, error)
end
|
Instance Method Details
#accept(callback = nil, &blk) ⇒ Object
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/libuv/pipe.rb', line 29
def accept(callback = nil, &blk)
pipe = nil
begin
raise RuntimeError, CLOSED_HANDLE_ERROR if @closed
pipe = Pipe.new(loop, @ipc, handle)
rescue Exception => e
@loop.log :info, :pipe_accept_failed, e
end
if pipe
begin
(callback || blk).call(pipe)
rescue Exception => e
@loop.log :error, :pipe_accept_cb, e
end
end
nil
end
|
#bind(name, callback = nil, &blk) ⇒ Object
19
20
21
22
23
24
25
26
27
|
# File 'lib/libuv/pipe.rb', line 19
def bind(name, callback = nil, &blk)
return if @closed
@on_listen = callback || blk
assert_type(String, name, "name must be a String")
name = windows_path name if FFI::Platform.windows?
error = check_result ::Libuv::Ext.pipe_bind(handle, name)
reject(error) if error
end
|
#check_pending(expecting = nil) ⇒ Object
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
# File 'lib/libuv/pipe.rb', line 129
def check_pending(expecting = nil)
return nil if ::Libuv::Ext.pipe_pending_count(handle) <= 0
pending = ::Libuv::Ext.pipe_pending_type(handle)
raise TypeError, "IPC expecting #{expecting} and received #{pending}" if expecting && expecting != pending
remote = nil
case pending
when :uv_tcp
remote = TCP.new(loop, handle)
when :uv_pipe
remote = Pipe.new(loop, @ipc, handle)
else
raise NotImplementedError, "IPC for handle #{pending} not supported"
end
remote
end
|
#connect(name, callback = nil, &blk) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/libuv/pipe.rb', line 65
def connect(name, callback = nil, &blk)
return if @closed
@callback = callback || blk
assert_type(String, name, "name must be a String")
begin
name = windows_path name if FFI::Platform.windows?
::Libuv::Ext.pipe_connect(::Libuv::Ext.create_request(:uv_connect), handle, name, callback(:on_connect))
rescue Exception => e
reject(e)
end
end
|
#getsockname ⇒ Object
148
149
150
151
152
153
154
155
|
# File 'lib/libuv/pipe.rb', line 148
def getsockname
size = 256
len = FFI::MemoryPointer.new(:size_t)
len.put_int(0, size)
buffer = FFI::MemoryPointer.new(size)
check_result! ::Libuv::Ext.pipe_getsockname(handle, buffer, len)
buffer.read_string
end
|
#open(fileno, callback = nil, &blk) ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/libuv/pipe.rb', line 47
def open(fileno, callback = nil, &blk)
@callback = callback || blk
assert_type(Integer, fileno, "io#fileno must return an integer file descriptor")
begin
raise RuntimeError, CLOSED_HANDLE_ERROR if @closed
check_result! ::Libuv::Ext.pipe_open(handle, fileno)
begin
@callback.call(self)
rescue Exception => e
@loop.log :error, :pipe_connect_cb, e
end
rescue Exception => e
reject(e)
end
end
|
#pending_instances=(count) ⇒ Object
123
124
125
126
127
|
# File 'lib/libuv/pipe.rb', line 123
def pending_instances=(count)
return 0 if @closed
assert_type(Integer, count, "count must be an Integer")
::Libuv::Ext.pipe_pending_instances(handle, count)
end
|
#write2(fd, data = ".") ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/libuv/pipe.rb', line 77
def write2(fd, data = ".")
deferred = @loop.defer
if @ipc && !@closed
begin
assert_type(String, data, WRITE_ERROR)
assert_type(Handle, fd, WRITE2_ERROR)
size = data.respond_to?(:bytesize) ? data.bytesize : data.size
buffer = ::Libuv::Ext.buf_init(FFI::MemoryPointer.from_string(data), size)
@write_callbacks = @write_callbacks || []
callback = FFI::Function.new(:void, [:pointer, :int]) do |req, status|
::Libuv::Ext.free(req)
promise = @write_callbacks.shift[0]
resolve promise, status
end
@write_callbacks << [deferred, callback]
req = ::Libuv::Ext.create_request(:uv_write)
error = check_result ::Libuv::Ext.write2(req, handle, buffer, 1, fd.handle, callback)
if error
@write_callbacks.pop
::Libuv::Ext.free(req)
deferred.reject(error)
reject(error)
end
rescue Exception => e
deferred.reject(e)
end
else
deferred.reject(TypeError.new('pipe not initialized for interprocess communication'))
end
deferred.promise
end
|