Class: GRPC::BidiCall

Inherits:
Object
  • Object
show all
Includes:
Core::CallOps, Core::StatusCodes, Core::TimeConsts
Defined in:
src/ruby/lib/grpc/generic/bidi_call.rb

Overview

The BiDiCall class orchestrates execution of a BiDi stream on a client or server.

Constant Summary

Constants included from Core::StatusCodes

Core::StatusCodes::ABORTED, Core::StatusCodes::ALREADY_EXISTS, Core::StatusCodes::CANCELLED, Core::StatusCodes::DATA_LOSS, Core::StatusCodes::DEADLINE_EXCEEDED, Core::StatusCodes::FAILED_PRECONDITION, Core::StatusCodes::INTERNAL, Core::StatusCodes::INVALID_ARGUMENT, Core::StatusCodes::NOT_FOUND, Core::StatusCodes::OK, Core::StatusCodes::OUT_OF_RANGE, Core::StatusCodes::PERMISSION_DENIED, Core::StatusCodes::RESOURCE_EXHAUSTED, Core::StatusCodes::UNAUTHENTICATED, Core::StatusCodes::UNAVAILABLE, Core::StatusCodes::UNIMPLEMENTED, Core::StatusCodes::UNKNOWN

Instance Method Summary collapse

Methods included from Core::TimeConsts

from_relative_time

Constructor Details

#initialize(call, marshal, unmarshal, metadata_received: false, req_view: nil) ⇒ BidiCall

Creates a BidiCall.

BidiCall should only be created after a call is accepted. That means different things on a client and a server. On the client, the call is accepted after call.invoke. On the server, this is after call.accept.

#initialize cannot determine if the call is accepted or not; so if a call that’s not accepted is used here, the error won’t be visible until the BidiCall#run is called.

deadline is the absolute deadline for the call.

Parameters:

  • call (Call)

    the call used by the ActiveCall

  • marshal (Function)

    f(obj)->string that marshal requests

  • unmarshal (Function)

    f(string)->obj that unmarshals responses

  • metadata_received (true|false) (defaults to: false)

    indicates if metadata has already been received. Should always be true for server calls



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'src/ruby/lib/grpc/generic/bidi_call.rb', line 44

def initialize(call, marshal, unmarshal, metadata_received: false,
               req_view: nil)
  fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
  @call = call
  @marshal = marshal
  @op_notifier = nil  # signals completion on clients
  @unmarshal = unmarshal
  @metadata_received = 
  @reads_complete = false
  @writes_complete = false
  @complete = false
  @done_mutex = Mutex.new
  @req_view = req_view
end

Instance Method Details

#read_next_loop(finalize_stream, is_client = false) ⇒ Object

Read the next stream iteration

Parameters:

  • finalize_stream (Proc)

    callback to call when the reads have been completely read through.

  • is_client (Boolean) (defaults to: false)

    If this is a client or server request



114
115
116
# File 'src/ruby/lib/grpc/generic/bidi_call.rb', line 114

def read_next_loop(finalize_stream, is_client = false)
  read_loop(finalize_stream, is_client: is_client)
end

#run_on_client(requests, set_input_stream_done, set_output_stream_done, &blk) ⇒ Object

Begins orchestration of the Bidi stream for a client sending requests.

The method either returns an Enumerator of the responses, or accepts a block that can be invoked with each response.

Parameters:

  • requests

    the Enumerable of requests to send

  • set_input_stream_done (Proc)

    called back when we’re done reading the input stream

  • set_output_stream_done (Proc)

    called back when we’re done sending data on the output stream

Returns:

  • an Enumerator of requests to yield



70
71
72
73
74
75
76
77
78
# File 'src/ruby/lib/grpc/generic/bidi_call.rb', line 70

def run_on_client(requests,
                  set_input_stream_done,
                  set_output_stream_done,
                  &blk)
  @enq_th = Thread.new do
    write_loop(requests, set_output_stream_done: set_output_stream_done)
  end
  read_loop(set_input_stream_done, &blk)
end

#run_on_server(gen_each_reply, requests) ⇒ Object

Begins orchestration of the Bidi stream for a server generating replies.

N.B. gen_each_reply is a func(Enumerable<Requests>)

It takes an enumerable of requests as an arg, in case there is a relationship between the stream of requests and the stream of replies.

This does not mean that must necessarily be one. E.g, the replies produced by gen_each_reply could ignore the received_msgs

Parameters:

  • gen_each_reply (Proc)

    generates the BiDi stream replies.

  • requests (Enumerable)

    The enumerable of requests to run



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'src/ruby/lib/grpc/generic/bidi_call.rb', line 92

def run_on_server(gen_each_reply, requests)
  replies = nil

  # Pass in the optional call object parameter if possible
  if gen_each_reply.arity == 1
    replies = gen_each_reply.call(requests)
  elsif gen_each_reply.arity == 2
    replies = gen_each_reply.call(requests, @req_view)
  else
    fail 'Illegal arity of reply generator'
  end

  write_loop(replies, is_client: false)
end