Class: Hermann::Provider::RDKafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
ext/hermann/hermann_rdkafka.c

Instance Method Summary collapse

Constructor Details

#initialize(topic, brokers, partition, offset) ⇒ Object

consumer_initialize

todo: configure the brokers through passed parameter, later through zk

Set up the Consumer’s HermannInstanceConfig context.

Parameters:

  • self

    VALUE the Ruby instance of the Consumer

  • topic

    VALUE a Ruby string

  • brokers

    VALUE a Ruby string containing list of host:port

  • partition

    VALUE a Ruby number

  • offset

    VALUE a Ruby number



1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
# File 'ext/hermann/hermann_rdkafka.c', line 1002

static VALUE consumer_initialize(VALUE self,
								 VALUE topic,
								 VALUE brokers,
								 VALUE partition,
								 VALUE offset) {

	HermannInstanceConfig* consumerConfig;
	char* topicPtr;
	char* brokersPtr;
	int partitionNo;

	TRACER("initing consumer ruby object\n");

	topicPtr = StringValuePtr(topic);
	brokersPtr = StringValuePtr(brokers);
	partitionNo = FIX2INT(partition);
	Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);

	consumerConfig->topic = strdup(topicPtr);
	consumerConfig->brokers = strdup(brokersPtr);
	consumerConfig->partition = partitionNo;
	consumerConfig->run = 1;
	consumerConfig->exit_eof = 0;
	consumerConfig->quiet = 0;

	if ( FIXNUM_P(offset) ) {
		consumerConfig->start_offset = FIX2LONG(offset);
	} else if ( SYMBOL_P(offset) ) {
		if ( offset == ID2SYM(rb_intern("start")) )
			consumerConfig->start_offset = RD_KAFKA_OFFSET_BEGINNING;
		else if ( offset == ID2SYM(rb_intern("end")) )
			consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
	} else {
		consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
	}

	return self;
}

Instance Method Details

#consume(topic) ⇒ Object

Hermann::Provider::RDKafka::Consumer.consume

Parameters:

  • VALUE

    self the Ruby object for this consumer

  • VALUE

    topic the Ruby string representing a topic to consume



462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# File 'ext/hermann/hermann_rdkafka.c', line 462

static VALUE consumer_consume(VALUE self, VALUE topic) {

	HermannInstanceConfig* consumerConfig;

	TRACER("starting consume\n");

	Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);

	if ((NULL == consumerConfig->topic) ||
		(0 == strlen(consumerConfig->topic))) {
		fprintf(stderr, "Topic is null!\n");
		rb_raise(rb_eRuntimeError, "Topic cannot be empty");
		return self;
	}

	if (!consumerConfig->isInitialized) {
		consumer_init_kafka(consumerConfig);
	}

	/* Start consuming */
	if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
		fprintf(stderr, "%% Failed to start consuming: %s\n",
			rd_kafka_err2str(rd_kafka_errno2err(errno)));
		rb_raise(rb_eRuntimeError, "%s",
				rd_kafka_err2str(rd_kafka_errno2err(errno)));
		return Qnil;
	}

	return rb_ensure(consumer_consume_loop, self, consumer_consume_loop_stop, self);
}

#initialize_copy(orig) ⇒ Object

consumer_init_copy

When copying into a new instance of a Consumer, reproduce the configuration info.

Parameters:

  • copy

    VALUE the Ruby Consumer instance (with configuration) as destination

  • orig

    VALUE the Ruby Consumer instance (with configuration) as source



1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
# File 'ext/hermann/hermann_rdkafka.c', line 1050

static VALUE consumer_init_copy(VALUE copy,
								VALUE orig) {
	HermannInstanceConfig* orig_config;
	HermannInstanceConfig* copy_config;

	if (copy == orig) {
		return copy;
	}

	if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)consumer_free) {
		rb_raise(rb_eTypeError, "wrong argument type");
	}

	Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
	Data_Get_Struct(copy, HermannInstanceConfig, copy_config);

	// Copy over the data from one struct to the other
	MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);

	return copy;
}