Class: GrpcKit::Stream::ClientStream

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/stream/client_stream.rb

Defined Under Namespace

Classes: Status

Instance Method Summary collapse

Constructor Details

#initialize(transport, config, authority:, timeout: nil) ⇒ ClientStream

Returns a new instance of ClientStream

Parameters:


11
12
13
14
15
16
17
18
19
20
# File 'lib/grpc_kit/stream/client_stream.rb', line 11

def initialize(transport, config, authority:, timeout: nil)
  @transport = transport
  @config = config

  @authority = authority
  @timeout = timeout
  @deadline = timeout&.to_absolute_time

  @started = false
end

Instance Method Details

#close_and_recvArray<Object>

Returns:

  • (Array<Object>)

84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/grpc_kit/stream/client_stream.rb', line 84

def close_and_recv
  validate_if_request_start!

  @transport.close_and_flush

  data = []
  loop { data.push(do_recv) }

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  data
end

#close_and_sendObject


72
73
74
75
76
77
78
79
80
81
# File 'lib/grpc_kit/stream/client_stream.rb', line 72

def close_and_send
  validate_if_request_start!

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  # send?
  @transport.close_and_flush
end

#eachObject


50
51
52
53
54
# File 'lib/grpc_kit/stream/client_stream.rb', line 50

def each
  validate_if_request_start!

  loop { yield(do_recv) }
end

#recv_msg(last: false, blocking: true) ⇒ Object

Parameters:

  • last (Boolean)
  • blocking (Boolean)

Returns:

  • (Object)

Raises:

  • (StopIteration)

    when recving message finished


60
61
62
63
64
65
66
67
68
69
70
# File 'lib/grpc_kit/stream/client_stream.rb', line 60

def recv_msg(last: false, blocking: true)
  validate_if_request_start!

  ret = do_recv(last: last, blocking: blocking)

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  ret
end

#send_msg(data, metadata: {}, last: false) ⇒ void

This method returns an undefined value.

Parameters:

  • data (Object)
  • metadata (Hash<String,String>)
  • last (Boolean)

26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/grpc_kit/stream/client_stream.rb', line 26

def send_msg(data, metadata: {}, last: false)
  buf =
    begin
      @config.protobuf.encode(data)
    rescue ArgumentError => e
      raise GrpcKit::Errors::Internal, "Error while encoding in client: #{e}"
    end

  limit_size = @config.max_send_message_size
  if limit_size && buf.bytesize > limit_size
    raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}"
  end

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  if @started
    @transport.write_data(buf, last: last)
  else
    start_request(buf, metadata: , last: last)
  end
end