Class: GRPC::Core::Call

Inherits:
Object
  • Object
show all
Defined in:
ext/grpc/rb_call.c

Constant Summary collapse

INTERNAL_ALL_CALLs =
hash_all_calls

Instance Method Summary collapse

Constructor Details

#initializeObject

Init func that fails by raising an exception.



68
69
70
71
72
73
# File 'ext/grpc/rb_grpc.c', line 68

VALUE grpc_rb_cannot_init(VALUE self) {
  rb_raise(rb_eTypeError,
           "initialization of %s only allowed from the gRPC native layer",
           rb_obj_classname(self));
  return Qnil;
}

Instance Method Details

#add_metadata(completion_queue, hash_elements, flags = nil) ⇒ Object

Add metadata elements to the call from a ruby hash, to be sent upon invocation. flags is a bit-field combination of the write flags defined above. REQUIRES: grpc_call_invoke/grpc_call_accept have not been called on this call. Produces no events.



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'ext/grpc/rb_call.c', line 159

static VALUE grpc_rb_call_add_metadata(int argc, VALUE *argv, VALUE self) {
  VALUE metadata;
  VALUE flags = Qnil;
  ID id_size = rb_intern("size");

  /* "11" == 1 mandatory args, 1 (flags) is optional */
  rb_scan_args(argc, argv, "11", &metadata, &flags);
  if (NIL_P(flags)) {
    flags = UINT2NUM(0); /* Default to no flags */
  }
  if (TYPE(metadata) != T_HASH) {
    rb_raise(rb_eTypeError, "add metadata failed: metadata should be a hash");
    return Qnil;
  }
  if (NUM2UINT(rb_funcall(metadata, id_size, 0)) == 0) {
    return Qnil;
  }
  rb_ivar_set(self, id_flags, flags);
  rb_ivar_set(self, id_input_md, metadata);
  rb_hash_foreach(metadata, grpc_rb_call_add_metadata_hash_cb, self);
  return Qnil;
}

#cancelObject

Called by clients to cancel an RPC on the server.

Can be called multiple times, from any thread.


184
185
186
187
188
189
190
191
192
193
194
195
# File 'ext/grpc/rb_call.c', line 184

static VALUE grpc_rb_call_cancel(VALUE self) {
  grpc_call *call = NULL;
  grpc_call_error err;
  Data_Get_Struct(self, grpc_call, call);
  err = grpc_call_cancel(call);
  if (err != GRPC_CALL_OK) {
    rb_raise(rb_eCallError, "cancel failed: %s (code=%d)",
             grpc_call_error_detail_of(err), err);
  }

  return Qnil;
}

#initialize_copy(self) ⇒ Object

Init/Clone func that fails by raising an exception.



76
77
78
79
80
81
# File 'ext/grpc/rb_grpc.c', line 76

VALUE grpc_rb_cannot_init_copy(VALUE copy, VALUE self) {
  rb_raise(rb_eTypeError,
           "initialization of %s only allowed from the gRPC native layer",
           rb_obj_classname(copy));
  return Qnil;
}

#invoke(completion_queue, tag, flags = nil) ⇒ Object

Invoke the RPC. Starts sending metadata and request headers on the wire.

flags is a bit-field combination of the write flags defined above.
REQUIRES: Can be called at most once per call.
          Can only be called on the client.
Produces a GRPC_INVOKE_ACCEPTED event on completion.


206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'ext/grpc/rb_call.c', line 206

static VALUE grpc_rb_call_invoke(int argc, VALUE *argv, VALUE self) {
  VALUE cqueue = Qnil;
  VALUE metadata_read_tag = Qnil;
  VALUE finished_tag = Qnil;
  VALUE flags = Qnil;
  grpc_call *call = NULL;
  grpc_completion_queue *cq = NULL;
  grpc_call_error err;

  /* "31" == 3 mandatory args, 1 (flags) is optional */
  rb_scan_args(argc, argv, "31", &cqueue, &metadata_read_tag, &finished_tag,
               &flags);
  if (NIL_P(flags)) {
    flags = UINT2NUM(0); /* Default to no flags */
  }
  cq = grpc_rb_get_wrapped_completion_queue(cqueue);
  Data_Get_Struct(self, grpc_call, call);
  err = grpc_call_invoke_old(call, cq, ROBJECT(metadata_read_tag),
                             ROBJECT(finished_tag), NUM2UINT(flags));
  if (err != GRPC_CALL_OK) {
    rb_raise(rb_eCallError, "invoke failed: %s (code=%d)",
             grpc_call_error_detail_of(err), err);
  }

  /* Add the completion queue as an instance attribute, prevents it from being
   * GCed until this call object is GCed */
  rb_ivar_set(self, id_cq, cqueue);

  return Qnil;
}

#metadata=(call) ⇒ Object

Gets the metadata object saved the call.



283
284
285
# File 'ext/grpc/rb_call.c', line 283

static VALUE (VALUE self) {
  return rb_ivar_get(self, );
}

#metadata=(metadata) ⇒ Object

Saves the metadata hash on the call.



292
293
294
295
296
297
298
299
300
# File 'ext/grpc/rb_call.c', line 292

static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
  if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
    rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
             rb_obj_classname(metadata));
    return Qnil;
  }

  return rb_ivar_set(self, id_metadata, metadata);
}

#server_accept(completion_queue, finished_tag) ⇒ Object

Accept an incoming RPC, binding a completion queue to it.

To be called before sending or receiving messages.

REQUIRES: Can be called at most once per call.
          Can only be called on the server.
Produces a GRPC_FINISHED event with finished_tag when the call has been
    completed (there may be other events for the call pending at this
    time)


427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'ext/grpc/rb_call.c', line 427

static VALUE grpc_rb_call_server_accept(VALUE self, VALUE cqueue,
                                        VALUE finished_tag) {
  grpc_call *call = NULL;
  grpc_completion_queue *cq = grpc_rb_get_wrapped_completion_queue(cqueue);
  grpc_call_error err;
  Data_Get_Struct(self, grpc_call, call);
  err = grpc_call_server_accept_old(call, cq, ROBJECT(finished_tag));
  if (err != GRPC_CALL_OK) {
    rb_raise(rb_eCallError, "server_accept failed: %s (code=%d)",
             grpc_call_error_detail_of(err), err);
  }

  /* Add the completion queue as an instance attribute, prevents it from being
   * GCed until this call object is GCed */
  rb_ivar_set(self, id_cq, cqueue);
  return Qnil;
}

#server_end_initial_metadata(flag) ⇒ Object

Only to be called on servers, before sending messages.

flags is a bit-field combination of the write flags defined above.

REQUIRES: Can be called at most once per call.
          Can only be called on the server, must be called after
          grpc_call_server_accept
Produces no events


396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'ext/grpc/rb_call.c', line 396

static VALUE grpc_rb_call_server_end_initial_metadata(int argc, VALUE *argv,
                                                      VALUE self) {
  VALUE flags = Qnil;
  grpc_call *call = NULL;
  grpc_call_error err;

  /* "01" == 1 (flags) is optional */
  rb_scan_args(argc, argv, "01", &flags);
  if (NIL_P(flags)) {
    flags = UINT2NUM(0); /* Default to no flags */
  }
  Data_Get_Struct(self, grpc_call, call);
  err = grpc_call_server_end_initial_metadata_old(call, NUM2UINT(flags));
  if (err != GRPC_CALL_OK) {
    rb_raise(rb_eCallError, "end_initial_metadata failed: %s (code=%d)",
             grpc_call_error_detail_of(err), err);
  }
  return Qnil;
}

#start_read(tag) ⇒ Object

Initiate a read on a call. Output event contains a byte buffer with the

result of the read.
REQUIRES: No other reads are pending on the call. It is only safe to start
the next read after the corresponding read event is received.


241
242
243
244
245
246
247
248
249
250
251
252
# File 'ext/grpc/rb_call.c', line 241

static VALUE grpc_rb_call_start_read(VALUE self, VALUE tag) {
  grpc_call *call = NULL;
  grpc_call_error err;
  Data_Get_Struct(self, grpc_call, call);
  err = grpc_call_start_read_old(call, ROBJECT(tag));
  if (err != GRPC_CALL_OK) {
    rb_raise(rb_eCallError, "start read failed: %s (code=%d)",
             grpc_call_error_detail_of(err), err);
  }

  return Qnil;
}

#start_write(byte_buffer, tag, flags = nil) ⇒ Object

Queue a byte buffer for writing.

flags is a bit-field combination of the write flags defined above.
A write with byte_buffer null is allowed, and will not send any bytes on the
wire. If this is performed without GRPC_WRITE_BUFFER_HINT flag it provides
a mechanism to flush any previously buffered writes to outgoing flow control.
REQUIRES: No other writes are pending on the call. It is only safe to
          start the next write after the corresponding write_accepted event
          is received.
          GRPC_INVOKE_ACCEPTED must have been received by the application
          prior to calling this on the client. On the server,
          grpc_call_accept must have been called successfully.
Produces a GRPC_WRITE_ACCEPTED event.


318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'ext/grpc/rb_call.c', line 318

static VALUE grpc_rb_call_start_write(int argc, VALUE *argv, VALUE self) {
  VALUE byte_buffer = Qnil;
  VALUE tag = Qnil;
  VALUE flags = Qnil;
  grpc_call *call = NULL;
  grpc_byte_buffer *bfr = NULL;
  grpc_call_error err;

  /* "21" == 2 mandatory args, 1 (flags) is optional */
  rb_scan_args(argc, argv, "21", &byte_buffer, &tag, &flags);
  if (NIL_P(flags)) {
    flags = UINT2NUM(0); /* Default to no flags */
  }
  bfr = grpc_rb_get_wrapped_byte_buffer(byte_buffer);
  Data_Get_Struct(self, grpc_call, call);
  err = grpc_call_start_write_old(call, bfr, ROBJECT(tag), NUM2UINT(flags));
  if (err != GRPC_CALL_OK) {
    rb_raise(rb_eCallError, "start write failed: %s (code=%d)",
             grpc_call_error_detail_of(err), err);
  }

  return Qnil;
}

#start_write_status(code, status, tag) ⇒ Object

Queue a status for writing.

call-seq:
   tag = Object.new
   call.write_status(200, "OK", tag)

REQUIRES: No other writes are pending on the call. It is only safe to
start the next write after the corresponding write_accepted event
is received.
GRPC_INVOKE_ACCEPTED must have been received by the application
prior to calling this.
Only callable on the server.
Produces a GRPC_FINISHED event when the status is sent and the stream is
fully closed


356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'ext/grpc/rb_call.c', line 356

static VALUE grpc_rb_call_start_write_status(VALUE self, VALUE code,
                                             VALUE status, VALUE tag) {
  grpc_call *call = NULL;
  grpc_call_error err;
  Data_Get_Struct(self, grpc_call, call);
  err = grpc_call_start_write_status_old(call, NUM2UINT(code),
                                         StringValueCStr(status), ROBJECT(tag));
  if (err != GRPC_CALL_OK) {
    rb_raise(rb_eCallError, "start write status: %s (code=%d)",
             grpc_call_error_detail_of(err), err);
  }

  return Qnil;
}

#status=(call) ⇒ Object

Gets the status object saved the call.



259
260
261
# File 'ext/grpc/rb_call.c', line 259

static VALUE grpc_rb_call_get_status(VALUE self) {
  return rb_ivar_get(self, id_status);
}

#status=(status) ⇒ Object

Saves a status object on the call.



268
269
270
271
272
273
274
275
276
# File 'ext/grpc/rb_call.c', line 268

static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
  if (!NIL_P(status) && rb_obj_class(status) != rb_sStatus) {
    rb_raise(rb_eTypeError, "bad status: got:<%s> want: <Struct::Status>",
             rb_obj_classname(status));
    return Qnil;
  }

  return rb_ivar_set(self, id_status, status);
}

#writes_done(tag) ⇒ Object

No more messages to send.

REQUIRES: No other writes are pending on the call.


373
374
375
376
377
378
379
380
381
382
383
384
# File 'ext/grpc/rb_call.c', line 373

static VALUE grpc_rb_call_writes_done(VALUE self, VALUE tag) {
  grpc_call *call = NULL;
  grpc_call_error err;
  Data_Get_Struct(self, grpc_call, call);
  err = grpc_call_writes_done_old(call, ROBJECT(tag));
  if (err != GRPC_CALL_OK) {
    rb_raise(rb_eCallError, "writes done: %s (code=%d)",
             grpc_call_error_detail_of(err), err);
  }

  return Qnil;
}