Class: GrpcKit::Stream::ServerStream
- Inherits:
-
Object
- Object
- GrpcKit::Stream::ServerStream
- Defined in:
- lib/grpc_kit/stream/server_stream.rb
Instance Method Summary collapse
- #each ⇒ Object
-
#initialize(transport) ⇒ ServerStream
constructor
A new instance of ServerStream.
- #invoke(rpc) ⇒ Object
- #recv_msg(protobuf, last: false, limit_size: nil) ⇒ Object
- #send_msg(data, protobuf, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) ⇒ Object
- #send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) ⇒ Object
Constructor Details
#initialize(transport) ⇒ ServerStream
Returns a new instance of ServerStream.
9 10 11 12 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 9 def initialize(transport) @transport = transport @started = false end |
Instance Method Details
#each ⇒ Object
73 74 75 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 73 def each loop { yield(recv) } end |
#invoke(rpc) ⇒ Object
14 15 16 17 18 19 20 21 22 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 14 def invoke(rpc) rpc.invoke(self, metadata: @transport.recv_headers.) rescue GrpcKit::Errors::BadStatus => e GrpcKit.logger.debug(e) send_status(status: e.code, msg: e.reason, metadata: {}) # TODO: metadata should be set rescue StandardError => e GrpcKit.logger.debug(e) send_status(status: GrpcKit::StatusCodes::UNKNOWN, msg: e., metadata: {}) end |
#recv_msg(protobuf, last: false, limit_size: nil) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 45 def recv_msg(protobuf, last: false, limit_size: nil) data = @transport.read_data(last: last) raise StopIteration if data.nil? compressed, size, buf = *data unless size == buf.size raise "inconsistent data: #{buf}" end if limit_size && size > limit_size raise GrpcKit::Errors::ResourceExhausted, "Receving message is too large: recevied=#{size}, max=#{limit_size}" end if compressed raise 'compress option is unsupported' end raise StopIteration if buf.nil? begin protobuf.decode(buf) rescue ArgumentError => e raise GrpcKit::Errors::Internal, "Error while decoding in server: #{e}" end end |
#send_msg(data, protobuf, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 24 def send_msg(data, protobuf, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) buf = begin protobuf.encode(data) rescue ArgumentError => e raise GrpcKit::Errors::Internal, "Error while encoding in server: #{e}" end if limit_size && buf.bytesize > limit_size raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}" end if last send_status(data: buf, metadata: ) elsif @started @transport.write_data(buf) else start_response(buf, metadata: ) end end |
#send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) ⇒ Object
77 78 79 80 81 82 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 77 def send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) @transport.write_data(data, last: true) if data write_trailers(status, msg, ) start_response unless @started end |