Class: GrpcKit::Stream::ServerStream
- Inherits:
-
Object
- Object
- GrpcKit::Stream::ServerStream
- Defined in:
- lib/grpc_kit/stream/server_stream.rb
Instance Method Summary collapse
-
#initialize(transport) ⇒ ServerStream
constructor
A new instance of ServerStream.
- #invoke(rpc) ⇒ void
- #recv_msg(codec, last: false, limit_size: nil) ⇒ Object
- #send_msg(data, codec, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) ⇒ void
- #send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) ⇒ void
Constructor Details
#initialize(transport) ⇒ ServerStream
Returns a new instance of ServerStream.
9 10 11 12 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 9 def initialize(transport) @transport = transport @started = false end |
Instance Method Details
#invoke(rpc) ⇒ void
This method returns an undefined value.
15 16 17 18 19 20 21 22 23 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 15 def invoke(rpc) rpc.invoke(self, metadata: @transport.recv_headers.) rescue GrpcKit::Errors::BadStatus => e GrpcKit.logger.debug(e) send_status(status: e.code, msg: e.reason, metadata: {}) # TODO: metadata should be set rescue StandardError => e GrpcKit.logger.debug(e) send_status(status: GrpcKit::StatusCodes::UNKNOWN, msg: e., metadata: {}) end |
#recv_msg(codec, last: false, limit_size: nil) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 58 def recv_msg(codec, last: false, limit_size: nil) data = @transport.read_data(last: last) raise StopIteration if data.nil? compressed, size, buf = *data unless size == buf.size raise "inconsistent data: #{buf}" end if limit_size && size > limit_size raise GrpcKit::Errors::ResourceExhausted, "Receving message is too large: recevied=#{size}, max=#{limit_size}" end if compressed raise 'compress option is unsupported' end begin codec.decode(buf) rescue ArgumentError => e raise GrpcKit::Errors::Internal, "Error while decoding in server: #{e}" end end |
#send_msg(data, codec, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) ⇒ void
This method returns an undefined value.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 32 def send_msg(data, codec, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) buf = begin codec.encode(data) rescue ArgumentError, TypeError => e raise GrpcKit::Errors::Internal, "Error while encoding in server: #{e}" end if limit_size && buf.bytesize > limit_size raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}" end if last send_status(data: buf, metadata: ) elsif @started @transport.write_data(buf) else start_response(buf, metadata: ) end end |
#send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) ⇒ void
This method returns an undefined value.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 88 def send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) t = build_trailers(status, msg, ) @transport.write_data(data, last: true) if data @transport.end_write if @started @transport.write_trailers(t) elsif data @transport.write_trailers(t) start_response else send_headers(trailers: t) end end |