Class: GRPC::BidiCall
- Inherits:
-
Object
- Object
- GRPC::BidiCall
- Includes:
- Core::CompletionType, Core::StatusCodes, Core::TimeConsts
- Defined in:
- lib/grpc/generic/bidi_call.rb
Overview
The BiDiCall class orchestrates exection of a BiDi stream on a client or server.
Instance Method Summary collapse
-
#initialize(call, q, marshal, unmarshal, deadline, finished_tag) ⇒ BidiCall
constructor
Creates a BidiCall.
-
#run_on_client(requests, &blk) ⇒ Object
Begins orchestration of the Bidi stream for a client sending requests.
-
#run_on_server(gen_each_reply) ⇒ Object
Begins orchestration of the Bidi stream for a server generating replies.
Methods included from Core::TimeConsts
Constructor Details
#initialize(call, q, marshal, unmarshal, deadline, finished_tag) ⇒ BidiCall
Creates a BidiCall.
BidiCall should only be created after a call is accepted. That means different things on a client and a server. On the client, the call is accepted after call.invoke. On the server, this is after call.accept.
#initialize cannot determine if the call is accepted or not; so if a call that’s not accepted is used here, the error won’t be visible until the BidiCall#run is called.
deadline is the absolute deadline for the call.
67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/grpc/generic/bidi_call.rb', line 67 def initialize(call, q, marshal, unmarshal, deadline, finished_tag) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') end @call = call @cq = q @deadline = deadline @finished_tag = finished_tag @marshal = marshal @readq = Queue.new @unmarshal = unmarshal end |
Instance Method Details
#run_on_client(requests, &blk) ⇒ Object
Begins orchestration of the Bidi stream for a client sending requests.
The method either returns an Enumerator of the responses, or accepts a block that can be invoked with each response.
88 89 90 91 92 93 94 95 96 |
# File 'lib/grpc/generic/bidi_call.rb', line 88 def run_on_client(requests, &blk) enq_th = start_write_loop(requests) loop_th = start_read_loop replies = each_queued_msg return replies if blk.nil? replies.each { |r| blk.call(r) } enq_th.join loop_th.join end |
#run_on_server(gen_each_reply) ⇒ Object
Begins orchestration of the Bidi stream for a server generating replies.
N.B. gen_each_reply is a func(Enumerable<Requests>)
It takes an enumerable of requests as an arg, in case there is a relationship between the stream of requests and the stream of replies.
This does not mean that must necessarily be one. E.g, the replies produced by gen_each_reply could ignore the received_msgs
109 110 111 112 113 114 115 |
# File 'lib/grpc/generic/bidi_call.rb', line 109 def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) enq_th = start_write_loop(replys, is_client: false) loop_th = start_read_loop loop_th.join enq_th.join end |