Class: Hermann::Provider::RDKafka::Consumer
- Inherits:
-
Object
- Object
- Hermann::Provider::RDKafka::Consumer
- Defined in:
- ext/hermann/hermann_rdkafka.c
Instance Method Summary collapse
-
#consume(topic) ⇒ Object
Hermann::Provider::RDKafka::Consumer.consume.
-
#initialize(topic, brokers, partition, offset) ⇒ Object
constructor
consumer_initialize.
-
#initialize_copy(orig) ⇒ Object
consumer_init_copy.
Constructor Details
#initialize(topic, brokers, partition, offset) ⇒ Object
consumer_initialize
todo: configure the brokers through passed parameter, later through zk
Set up the Consumer’s HermannInstanceConfig context.
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 |
# File 'ext/hermann/hermann_rdkafka.c', line 1002
static VALUE consumer_initialize(VALUE self,
VALUE topic,
VALUE brokers,
VALUE partition,
VALUE offset) {
HermannInstanceConfig* consumerConfig;
char* topicPtr;
char* brokersPtr;
int partitionNo;
TRACER("initing consumer ruby object\n");
topicPtr = StringValuePtr(topic);
brokersPtr = StringValuePtr(brokers);
partitionNo = FIX2INT(partition);
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
consumerConfig->topic = strdup(topicPtr);
consumerConfig->brokers = strdup(brokersPtr);
consumerConfig->partition = partitionNo;
consumerConfig->run = 1;
consumerConfig->exit_eof = 0;
consumerConfig->quiet = 0;
if ( FIXNUM_P(offset) ) {
consumerConfig->start_offset = FIX2LONG(offset);
} else if ( SYMBOL_P(offset) ) {
if ( offset == ID2SYM(rb_intern("start")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_BEGINNING;
else if ( offset == ID2SYM(rb_intern("end")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
} else {
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
}
return self;
}
|
Instance Method Details
#consume(topic) ⇒ Object
Hermann::Provider::RDKafka::Consumer.consume
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 |
# File 'ext/hermann/hermann_rdkafka.c', line 462
static VALUE consumer_consume(VALUE self, VALUE topic) {
HermannInstanceConfig* consumerConfig;
TRACER("starting consume\n");
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
if ((NULL == consumerConfig->topic) ||
(0 == strlen(consumerConfig->topic))) {
fprintf(stderr, "Topic is null!\n");
rb_raise(rb_eRuntimeError, "Topic cannot be empty");
return self;
}
if (!consumerConfig->isInitialized) {
consumer_init_kafka(consumerConfig);
}
/* Start consuming */
if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
fprintf(stderr, "%% Failed to start consuming: %s\n",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
rb_raise(rb_eRuntimeError, "%s",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return Qnil;
}
return rb_ensure(consumer_consume_loop, self, consumer_consume_loop_stop, self);
}
|
#initialize_copy(orig) ⇒ Object
consumer_init_copy
When copying into a new instance of a Consumer, reproduce the configuration info.
1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 |
# File 'ext/hermann/hermann_rdkafka.c', line 1050
static VALUE consumer_init_copy(VALUE copy,
VALUE orig) {
HermannInstanceConfig* orig_config;
HermannInstanceConfig* copy_config;
if (copy == orig) {
return copy;
}
if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)consumer_free) {
rb_raise(rb_eTypeError, "wrong argument type");
}
Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
Data_Get_Struct(copy, HermannInstanceConfig, copy_config);
// Copy over the data from one struct to the other
MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);
return copy;
}
|