Class: GrpcKit::Stream
Instance Method Summary
collapse
#pack, #unpack, #unpacker
Constructor Details
#initialize(protobuf:, session:, stream:) ⇒ Stream
Returns a new instance of Stream.
17
18
19
20
21
|
# File 'lib/grpc_kit/stream.rb', line 17
def initialize(protobuf:, session:, stream:)
@protobuf = protobuf
@session = session
@stream = stream
end
|
Instance Method Details
#each ⇒ Object
23
24
25
26
27
28
29
30
|
# File 'lib/grpc_kit/stream.rb', line 23
def each
loop do
data = recv
return if data.nil?
yield(data)
end
end
|
#recv(last: false, limit_size: nil) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/grpc_kit/stream.rb', line 48
def recv(last: false, limit_size: nil)
data = unpack(read(last: last))
return nil unless data
compressed, size, buf = *data
unless size == buf.size
raise "inconsistent data: #{buf}"
end
if limit_size && size > limit_size
raise GrpcKit::Errors::ResourceExhausted, "Receving message is too large: recevied=#{size}, max=#{limit_size}"
end
if compressed
raise 'compress option is unsupported'
end
begin
@protobuf.decode(buf)
rescue ArgumentError => e
raise GrpcKit::Errors::Internal, "Error while decoding #{e}"
end
end
|
#send(data, last: false, limit_size: nil) ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/grpc_kit/stream.rb', line 32
def send(data, last: false, limit_size: nil)
b =
begin
@protobuf.encode(data)
rescue ArgumentError => e
raise GrpcKit::Errors::Internal, "Error while encoding: #{e}"
end
req = pack(b)
if limit_size && req.bytesize > limit_size
raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}"
end
@stream.write_send_data(req, last: last)
end
|
#send_trailer(status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) ⇒ Object
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/grpc_kit/stream.rb', line 74
def send_trailer(status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {})
trailer = metadata.dup
trailer['grpc-status'] = status.to_s
if msg
trailer['grpc-message'] = msg
end
@stream.write_trailers_data(trailer)
@stream.end_write
end
|
#submit_response(_header = nil, piggyback_trailer: false) ⇒ Object
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/grpc_kit/stream.rb', line 86
def submit_response( = nil, piggyback_trailer: false)
= { ':status' => '200', 'content-type' => 'application/grpc' }
['accept-encoding'] = 'identity'
@session.submit_response(@stream.stream_id, )
end
|