Class: GrpcKit::Stream::ClientStream

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/stream/client_stream.rb

Defined Under Namespace

Classes: Status

Instance Method Summary collapse

Constructor Details

#initialize(transport, config, authority:, timeout: nil) ⇒ ClientStream

Returns a new instance of ClientStream.

Parameters:



11
12
13
14
15
16
17
18
19
20
# File 'lib/grpc_kit/stream/client_stream.rb', line 11

def initialize(transport, config, authority:, timeout: nil)
  @transport = transport
  @config = config

  @authority = authority
  @timeout = timeout
  @deadline = timeout&.to_absolute_time

  @started = false
end

Instance Method Details

#close_and_recvObject

Returns:

  • (Object)


79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/grpc_kit/stream/client_stream.rb', line 79

def close_and_recv
  validate_if_request_start!

  @transport.close_and_flush

  ret = do_recv(last: true)

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  ret
end

#close_and_sendObject



67
68
69
70
71
72
73
74
75
76
# File 'lib/grpc_kit/stream/client_stream.rb', line 67

def close_and_send
  validate_if_request_start!

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  # send?
  @transport.close_and_flush
end

#recv_msg(last: false, blocking: true) ⇒ Object

This method is not thread safe, never call from multiple threads at once.

Parameters:

  • last (Boolean) (defaults to: false)
  • blocking (Boolean) (defaults to: true)

Returns:

  • (Object)

Raises:

  • (StopIteration)

    when recving message finished



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/grpc_kit/stream/client_stream.rb', line 55

def recv_msg(last: false, blocking: true)
  validate_if_request_start!

  ret = do_recv(last: last, blocking: blocking)

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  ret
end

#send_msg(data, metadata: {}, last: false) ⇒ void

This method returns an undefined value.

Parameters:

  • data (Object)
  • metadata (Hash<String,String>) (defaults to: {})
  • last (Boolean) (defaults to: false)


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/grpc_kit/stream/client_stream.rb', line 26

def send_msg(data, metadata: {}, last: false)
  buf =
    begin
      @config.codec.encode(data)
    rescue ArgumentError => e
      raise GrpcKit::Errors::Internal, "Error while encoding in client: #{e}"
    end

  limit_size = @config.max_send_message_size
  if limit_size && buf.bytesize > limit_size
    raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}"
  end

  if @deadline && Time.now > @deadline
    raise GrpcKit::Errors::DeadlineExceeded, @deadline
  end

  if @started
    @transport.write_data(buf, last: last)
  else
    start_request(buf, metadata: , last: last)
  end
end