Class: Hermann::Provider::RDKafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
ext/hermann/hermann_rdkafka.c

Instance Method Summary collapse

Constructor Details

#initialize(brokers) ⇒ Object

producer_initialize

Set up the configuration context for the Producer instance

Parameters:

  • self

    VALUE the Producer instance

  • brokers

    VALUE a Ruby string containing host:port pairs separated by commas



1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
# File 'ext/hermann/hermann_rdkafka.c', line 1148

static VALUE producer_initialize(VALUE self,
								 VALUE brokers) {

	HermannInstanceConfig* producerConfig;
	char* topicPtr;
	char* brokersPtr;

	TRACER("initialize Producer ruby object\n");

	brokersPtr = StringValuePtr(brokers);
	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	producerConfig->brokers = brokersPtr;
	/** Using RD_KAFKA_PARTITION_UA specifies we want the partitioner callback to be called to determine the target
	 *  partition
	 */
	producerConfig->partition = RD_KAFKA_PARTITION_UA;
	producerConfig->run = 1;
	producerConfig->exit_eof = 0;
	producerConfig->quiet = 0;

	return self;
}

Instance Method Details

#connect(timeout) ⇒ Object

Producer.connect



745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
# File 'ext/hermann/hermann_rdkafka.c', line 745

static VALUE producer_connect(VALUE self, VALUE timeout) {
	HermannInstanceConfig *producerConfig;
	rd_kafka_resp_err_t err;
	VALUE result = Qfalse;
	hermann_metadata_ctx_t md_context;

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	if (!producerConfig->isInitialized) {
		producer_init_kafka(self, producerConfig);
	}

	md_context.rk = producerConfig->rk;
	md_context.topic = NULL;
	md_context.data = NULL;
	md_context.timeout_ms = rb_num2int(timeout);

	err = producer_metadata_request(&md_context);

	TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);

	if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
		TRACER("brokers: %i, topics: %i\n",
				md_context.data->broker_cnt,
				md_context.data->topic_cnt);
		producerConfig->isConnected = 1;
		result = Qtrue;
	}
	else {
		producerConfig->isErrored = err;
	}

	if ( md_context.data )
		rd_kafka_metadata_destroy(md_context.data);

	return result;
}

#connected?Boolean

Producer.connected?

Returns:

  • (Boolean)


889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
# File 'ext/hermann/hermann_rdkafka.c', line 889

static VALUE producer_is_connected(VALUE self) {
	HermannInstanceConfig *producerConfig;

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	if (!producerConfig->isInitialized) {
		return Qfalse;
	}

	if (!producerConfig->isConnected) {
		return Qfalse;
	}

	return Qtrue;
}

#errored?Boolean

Producer.errored?

Returns:

  • (Boolean)


905
906
907
908
909
910
911
912
913
914
915
# File 'ext/hermann/hermann_rdkafka.c', line 905

static VALUE producer_is_errored(VALUE self) {
	HermannInstanceConfig *producerConfig;

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	if (producerConfig->isErrored) {
		return Qtrue;
	}

	return Qfalse;
}

#initialize_copy(orig) ⇒ Object

producer_init_copy

Copy the configuration information from orig into copy for the given Producer instances.

Parameters:

  • copy

    VALUE destination Producer

  • orign

    VALUE source Producer



1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
# File 'ext/hermann/hermann_rdkafka.c', line 1180

static VALUE producer_init_copy(VALUE copy,
								VALUE orig) {
	HermannInstanceConfig* orig_config;
	HermannInstanceConfig* copy_config;

	if (copy == orig) {
		return copy;
	}

	if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)producer_free) {
		rb_raise(rb_eTypeError, "wrong argument type");
	}

	Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
	Data_Get_Struct(copy, HermannInstanceConfig, copy_config);

	// Copy over the data from one struct to the other
	MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);

	return copy;
}

#metadata(topicStr, timeout) ⇒ Object

producer_metadata

make a metadata request to the kafka server, returning a hash containing a list of brokers and topics.

Parameters:

  • data

    struct rd_kafka_metadata* data returned from rd_kafka_metadata



854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
# File 'ext/hermann/hermann_rdkafka.c', line 854

static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
	HermannInstanceConfig *producerConfig;
	rd_kafka_resp_err_t err;
	hermann_metadata_ctx_t md_context;
	VALUE result;

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	if (!producerConfig->isInitialized) {
		producer_init_kafka(self, producerConfig);
	}

	md_context.rk = producerConfig->rk;
	md_context.timeout_ms = rb_num2int(timeout);

	if ( !NIL_P(topicStr) ) {
		Check_Type(topicStr, T_STRING);
		md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
	} else {
		md_context.topic = NULL;
	}

	err = producer_metadata_request(&md_context);

	if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
		// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
		rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
	} else {
		result = producer_metadata_make_hash(md_context.data);
		rd_kafka_metadata_destroy(md_context.data);
		return result;
	}

}

#push_single(message, topic, result) ⇒ Object

producer_push_single

push completes

Parameters:

  • self

    VALUE the Ruby producer instance

  • message

    VALUE the ruby String containing the outgoing message.

  • topic

    VALUE the ruby String containing the topic to use for the outgoing message.

  • result

    VALUE the Hermann::Result object to be fulfilled when the



595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
# File 'ext/hermann/hermann_rdkafka.c', line 595

static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {

	HermannInstanceConfig* producerConfig;
	/* Context pointer, pointing to `result`, for the librdkafka delivery
	 * callback
	 */
	hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
	rd_kafka_topic_t *rkt = NULL;

	TRACER("self: %p, message: %p, result: %p)\n", self, message, result);

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	delivery_ctx->producer = producerConfig;
	delivery_ctx->result = (VALUE) NULL;

	TRACER("producerConfig: %p\n", producerConfig);

	if ((Qnil == topic) ||
		(0 == RSTRING_LEN(topic))) {
		rb_raise(rb_eArgError, "Topic cannot be empty");
		return self;
	}

   	if (!producerConfig->isInitialized) {
		producer_init_kafka(self, producerConfig);
	}

	TRACER("kafka initialized\n");

	rkt = rd_kafka_topic_new(producerConfig->rk,
								RSTRING_PTR(topic),
								NULL);

	if (NULL == rkt) {
		rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
		return self;
	}

	/* Only pass result through if it's non-nil */
	if (Qnil != result) {
		delivery_ctx->result = result;
		TRACER("setting result: %p\n", result);
	}

	TRACER("rd_kafka_produce() message of %i bytes\n", RSTRING_LEN(message));

	/* Send/Produce message. */
	if (-1 == rd_kafka_produce(rkt,
						 producerConfig->partition,
						 RD_KAFKA_MSG_F_COPY,
						 RSTRING_PTR(message),
						 RSTRING_LEN(message),
						 NULL,
						 0,
						 delivery_ctx)) {
		fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
					rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
					rd_kafka_err2str(rd_kafka_errno2err(errno)));
		/* TODO: raise a Ruby exception here, requires a test though */
	}

	if (NULL != rkt) {
		rd_kafka_topic_destroy(rkt);
	}

	TRACER("returning\n");

	return self;
}

#tick(timeout) ⇒ Object

producer_tick

This function is responsible for ticking the librdkafka reactor so we can get feedback from the librdkafka threads back into the Ruby environment

@param  self	VALUE   the Ruby producer instance
@param  message VALUE   A Ruby FixNum of how many ms we should wait on librdkafka


675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
# File 'ext/hermann/hermann_rdkafka.c', line 675

static VALUE producer_tick(VALUE self, VALUE timeout) {
	hermann_conf_t *conf = NULL;
	long timeout_ms = 0;
	int events = 0;

	if (Qnil != timeout) {
		timeout_ms = rb_num2int(timeout);
	}
	else {
		rb_raise(rb_eArgError, "Cannot call `tick` with a nil timeout!\n");
	}

	Data_Get_Struct(self, hermann_conf_t, conf);

	/*
	 * if the producerConfig is not initialized then we never properly called
	 * producer_push_single, so why are we ticking?
	 */
	if (!conf->isInitialized) {
		rb_raise(rb_eRuntimeError, "Cannot call `tick` without having ever sent a message\n");
	}

	events = rd_kafka_poll(conf->rk, timeout_ms);

	if (conf->isErrored) {
		rb_raise(rb_eStandardError, "%s", conf->error);
	}

	return rb_int_new(events);
}