Class: Hermann::Lib::Consumer

Inherits:
Object
  • Object
show all
Defined in:
ext/hermann/hermann_lib.c

Instance Method Summary collapse

Constructor Details

#initialize(topic, brokers, partition, offset) ⇒ Object

consumer_initialize

todo: configure the brokers through passed parameter, later through zk

Set up the Consumer’s HermannInstanceConfig context.

Parameters:

  • self

    VALUE the Ruby instance of the Consumer

  • topic

    VALUE a Ruby string

  • brokers

    VALUE a Ruby string containing list of host:port

  • partition

    VALUE a Ruby number

  • offset

    VALUE a Ruby number



825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
# File 'ext/hermann/hermann_lib.c', line 825

static VALUE consumer_initialize(VALUE self,
								 VALUE topic,
								 VALUE brokers,
								 VALUE partition,
								 VALUE offset) {

	HermannInstanceConfig* consumerConfig;
	char* topicPtr;
	char* brokersPtr;
	int partitionNo;

	TRACER("initing consumer ruby object\n");

	topicPtr = StringValuePtr(topic);
	brokersPtr = StringValuePtr(brokers);
	partitionNo = FIX2INT(partition);
	Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);

	consumerConfig->topic = strdup(topicPtr);
	consumerConfig->brokers = strdup(brokersPtr);
	consumerConfig->partition = partitionNo;
	consumerConfig->run = 1;
	consumerConfig->exit_eof = 0;
	consumerConfig->quiet = 0;

	if ( FIXNUM_P(offset) ) {
		consumerConfig->start_offset = FIX2LONG(offset);
	} else if ( SYMBOL_P(offset) ) {
		if ( offset == ID2SYM(rb_intern("start")) )
			consumerConfig->start_offset = RD_KAFKA_OFFSET_BEGINNING;
		else if ( offset == ID2SYM(rb_intern("end")) )
			consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
	} else {
		consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
	}

	return self;
}

Instance Method Details

#consume(topic) ⇒ Object

Hermann::Lib::Consumer.consume

Parameters:

  • VALUE

    self the Ruby object for this consumer

  • VALUE

    topic the Ruby string representing a topic to consume



426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
# File 'ext/hermann/hermann_lib.c', line 426

static VALUE consumer_consume(VALUE self, VALUE topic) {

	HermannInstanceConfig* consumerConfig;

	TRACER("starting consume\n");

	Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);

	if ((NULL == consumerConfig->topic) ||
		(0 == strlen(consumerConfig->topic))) {
		fprintf(stderr, "Topic is null!\n");
		rb_raise(rb_eRuntimeError, "Topic cannot be empty");
		return self;
	}

	if (!consumerConfig->isInitialized) {
		consumer_init_kafka(consumerConfig);
	}

	/* Start consuming */
	if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
		fprintf(stderr, "%% Failed to start consuming: %s\n",
			rd_kafka_err2str(rd_kafka_errno2err(errno)));
		rb_raise(rb_eRuntimeError,
				rd_kafka_err2str(rd_kafka_errno2err(errno)));
		return Qnil;
	}

  consumer_consume_loop(consumerConfig);

	/* Stop consuming */
	rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);

	return Qnil;
}

#initialize_copy(orig) ⇒ Object

consumer_init_copy

When copying into a new instance of a Consumer, reproduce the configuration info.

Parameters:

  • copy

    VALUE the Ruby Consumer instance (with configuration) as destination

  • orig

    VALUE the Ruby Consumer instance (with configuration) as source



873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
# File 'ext/hermann/hermann_lib.c', line 873

static VALUE consumer_init_copy(VALUE copy,
								VALUE orig) {
	HermannInstanceConfig* orig_config;
	HermannInstanceConfig* copy_config;

	if (copy == orig) {
		return copy;
	}

	if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)consumer_free) {
		rb_raise(rb_eTypeError, "wrong argument type");
	}

	Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
	Data_Get_Struct(copy, HermannInstanceConfig, copy_config);

	// Copy over the data from one struct to the other
	MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);

	return copy;
}