Class: GrpcKit::Stream

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
GrpcKit::Streams::Packable
Defined in:
lib/grpc_kit/stream.rb

Instance Method Summary collapse

Methods included from GrpcKit::Streams::Packable

#pack, #unpack, #unpacker

Constructor Details

#initialize(protobuf:, session:, stream:) ⇒ Stream

Returns a new instance of Stream.



17
18
19
20
21
# File 'lib/grpc_kit/stream.rb', line 17

def initialize(protobuf:, session:, stream:)
  @protobuf = protobuf
  @session = session
  @stream = stream
end

Instance Method Details

#eachObject



23
24
25
26
27
28
29
30
# File 'lib/grpc_kit/stream.rb', line 23

def each
  loop do
    data = recv
    return if data.nil?

    yield(data)
  end
end

#recv(last: false, limit_size: nil) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/grpc_kit/stream.rb', line 48

def recv(last: false, limit_size: nil)
  data = unpack(read(last: last))

  return nil unless data

  compressed, size, buf = *data

  unless size == buf.size
    raise "inconsistent data: #{buf}"
  end

  if limit_size && size > limit_size
    raise GrpcKit::Errors::ResourceExhausted, "Receving message is too large: recevied=#{size}, max=#{limit_size}"
  end

  if compressed
    raise 'compress option is unsupported'
  end

  begin
    @protobuf.decode(buf)
  rescue ArgumentError => e
    raise GrpcKit::Errors::Internal, "Error while decoding #{e}"
  end
end

#send(data, last: false, limit_size: nil) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/grpc_kit/stream.rb', line 32

def send(data, last: false, limit_size: nil)
  b =
    begin
      @protobuf.encode(data)
    rescue ArgumentError => e
      raise GrpcKit::Errors::Internal, "Error while encoding: #{e}"
    end

  req = pack(b)
  if limit_size && req.bytesize > limit_size
    raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}"
  end

  @stream.write_send_data(req, last: last)
end

#send_trailer(status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) ⇒ Object



74
75
76
77
78
79
80
81
82
83
# File 'lib/grpc_kit/stream.rb', line 74

def send_trailer(status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {})
  trailer = .dup
  trailer['grpc-status'] = status.to_s
  if msg
    trailer['grpc-message'] = msg
  end

  @stream.write_trailers_data(trailer)
  @stream.end_write
end

#submit_response(_header = nil, piggyback_trailer: false) ⇒ Object

TODO: use actual data



86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/grpc_kit/stream.rb', line 86

def submit_response(_header = nil, piggyback_trailer: false)
  headers = { ':status' => '200', 'content-type' => 'application/grpc' }

  # ds9 does not support nthttp2_submit_{response|request} without body
  # if piggyback_trailer
  #   headers.merge!(@stream.trailer_data)
  #   @stream.need_trailer = false
  # else
  headers['accept-encoding'] = 'identity'
  # end

  @session.submit_response(@stream.stream_id, headers)
end