Class: Hermann::Provider::RDKafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
ext/hermann/hermann_rdkafka.c

Instance Method Summary collapse

Constructor Details

#initialize(brokers) ⇒ Object

producer_initialize

Set up the configuration context for the Producer instance

Parameters:

  • self

    VALUE the Producer instance

  • brokers

    VALUE a Ruby string containing host:port pairs separated by commas



1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
# File 'ext/hermann/hermann_rdkafka.c', line 1167

static VALUE producer_initialize(VALUE self,
								 VALUE brokers) {

	HermannInstanceConfig* producerConfig;
	char* topicPtr;
	char* brokersPtr;

	TRACER("initialize Producer ruby object\n");

	brokersPtr = StringValuePtr(brokers);
	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	producerConfig->brokers = brokersPtr;
	/** Using RD_KAFKA_PARTITION_UA specifies we want the partitioner callback to be called to determine the target
	 *  partition
	 */
	producerConfig->partition = RD_KAFKA_PARTITION_UA;
	producerConfig->run = 1;
	producerConfig->exit_eof = 0;
	producerConfig->quiet = 0;

	return self;
}

Instance Method Details

#connect(timeout) ⇒ Object

Producer.connect



764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
# File 'ext/hermann/hermann_rdkafka.c', line 764

static VALUE producer_connect(VALUE self, VALUE timeout) {
	HermannInstanceConfig *producerConfig;
	rd_kafka_resp_err_t err;
	VALUE result = Qfalse;
	hermann_metadata_ctx_t md_context;

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	if (!producerConfig->isInitialized) {
		producer_init_kafka(self, producerConfig);
	}

	md_context.rk = producerConfig->rk;
	md_context.topic = NULL;
	md_context.data = NULL;
	md_context.timeout_ms = rb_num2int(timeout);

	err = producer_metadata_request(&md_context);

	TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);

	if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
		TRACER("brokers: %i, topics: %i\n",
				md_context.data->broker_cnt,
				md_context.data->topic_cnt);
		producerConfig->isConnected = 1;
		result = Qtrue;
	}
	else {
		producerConfig->isErrored = err;
	}

	if ( md_context.data )
		rd_kafka_metadata_destroy(md_context.data);

	return result;
}

#connected?Boolean

Producer.connected?

Returns:

  • (Boolean)


908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
# File 'ext/hermann/hermann_rdkafka.c', line 908

static VALUE producer_is_connected(VALUE self) {
	HermannInstanceConfig *producerConfig;

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	if (!producerConfig->isInitialized) {
		return Qfalse;
	}

	if (!producerConfig->isConnected) {
		return Qfalse;
	}

	return Qtrue;
}

#errored?Boolean

Producer.errored?

Returns:

  • (Boolean)


924
925
926
927
928
929
930
931
932
933
934
# File 'ext/hermann/hermann_rdkafka.c', line 924

static VALUE producer_is_errored(VALUE self) {
	HermannInstanceConfig *producerConfig;

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	if (producerConfig->isErrored) {
		return Qtrue;
	}

	return Qfalse;
}

#initialize_copy(orig) ⇒ Object

producer_init_copy

Copy the configuration information from orig into copy for the given Producer instances.

Parameters:

  • copy

    VALUE destination Producer

  • orign

    VALUE source Producer



1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
# File 'ext/hermann/hermann_rdkafka.c', line 1199

static VALUE producer_init_copy(VALUE copy,
								VALUE orig) {
	HermannInstanceConfig* orig_config;
	HermannInstanceConfig* copy_config;

	if (copy == orig) {
		return copy;
	}

	if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)producer_free) {
		rb_raise(rb_eTypeError, "wrong argument type");
	}

	Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
	Data_Get_Struct(copy, HermannInstanceConfig, copy_config);

	// Copy over the data from one struct to the other
	MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);

	return copy;
}

#metadata(topicStr, timeout) ⇒ Object

producer_metadata

make a metadata request to the kafka server, returning a hash containing a list of brokers and topics.

Parameters:

  • data

    struct rd_kafka_metadata* data returned from rd_kafka_metadata



873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
# File 'ext/hermann/hermann_rdkafka.c', line 873

static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
	HermannInstanceConfig *producerConfig;
	rd_kafka_resp_err_t err;
	hermann_metadata_ctx_t md_context;
	VALUE result;

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	if (!producerConfig->isInitialized) {
		producer_init_kafka(self, producerConfig);
	}

	md_context.rk = producerConfig->rk;
	md_context.timeout_ms = rb_num2int(timeout);

	if ( !NIL_P(topicStr) ) {
		Check_Type(topicStr, T_STRING);
		md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
	} else {
		md_context.topic = NULL;
	}

	err = producer_metadata_request(&md_context);

	if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
		// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
		rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
	} else {
		result = producer_metadata_make_hash(md_context.data);
		rd_kafka_metadata_destroy(md_context.data);
		return result;
	}

}

#push_single(message, topic, partition_key, result) ⇒ Object

producer_push_single

push completes

Parameters:

  • self

    VALUE the Ruby producer instance

  • message

    VALUE the ruby String containing the outgoing message.

  • topic

    VALUE the ruby String containing the topic to use for the outgoing message.

  • key

    VALUE the ruby String containing the key to partition by

  • result

    VALUE the Hermann::Result object to be fulfilled when the



607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
# File 'ext/hermann/hermann_rdkafka.c', line 607

static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE partition_key, VALUE result) {

	HermannInstanceConfig* producerConfig;
	/* Context pointer, pointing to `result`, for the librdkafka delivery
	 * callback
	 */
	hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
	rd_kafka_topic_t *rkt = NULL;
	rd_kafka_topic_conf_t *rkt_conf = NULL;

	TRACER("self: %p, message: %p, result: %p)\n", self, message, result);

	Data_Get_Struct(self, HermannInstanceConfig, producerConfig);

	delivery_ctx->producer = producerConfig;
	delivery_ctx->result = (VALUE) NULL;

	TRACER("producerConfig: %p\n", producerConfig);

	if ((Qnil == topic) ||
		(0 == RSTRING_LEN(topic))) {
		rb_raise(rb_eArgError, "Topic cannot be empty");
		return self;
	}

   	if (!producerConfig->isInitialized) {
		producer_init_kafka(self, producerConfig);
	}

	TRACER("kafka initialized\n");

	/* Topic configuration */
	rkt_conf = rd_kafka_topic_conf_new();

	/* Set the partitioner callback */
	rd_kafka_topic_conf_set_partitioner_cb(rkt_conf, producer_partitioner_callback);

	rkt = rd_kafka_topic_new(producerConfig->rk,
								RSTRING_PTR(topic),
								rkt_conf);

	if (NULL == rkt) {
		rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
		return self;
	}

	/* Only pass result through if it's non-nil */
	if (Qnil != result) {
		delivery_ctx->result = result;
		TRACER("setting result: %p\n", result);
	}

	TRACER("rd_kafka_produce() message of %i bytes\n", RSTRING_LEN(message));

	/* Send/Produce message. */
	if (-1 == rd_kafka_produce(rkt,
						 producerConfig->partition,
						 RD_KAFKA_MSG_F_COPY,
						 RSTRING_PTR(message),
						 RSTRING_LEN(message),
						 RSTRING_PTR(partition_key),
						 RSTRING_LEN(partition_key),
						 delivery_ctx)) {
		fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
					rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
					rd_kafka_err2str(rd_kafka_errno2err(errno)));
		/* TODO: raise a Ruby exception here, requires a test though */
	}

	if (NULL != rkt) {
		rd_kafka_topic_destroy(rkt);
	}

	TRACER("returning\n");

	return self;
}

#tick(timeout) ⇒ Object

producer_tick

This function is responsible for ticking the librdkafka reactor so we can get feedback from the librdkafka threads back into the Ruby environment

@param  self	VALUE   the Ruby producer instance
@param  message VALUE   A Ruby FixNum of how many ms we should wait on librdkafka


694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
# File 'ext/hermann/hermann_rdkafka.c', line 694

static VALUE producer_tick(VALUE self, VALUE timeout) {
	hermann_conf_t *conf = NULL;
	long timeout_ms = 0;
	int events = 0;

	if (Qnil != timeout) {
		timeout_ms = rb_num2int(timeout);
	}
	else {
		rb_raise(rb_eArgError, "Cannot call `tick` with a nil timeout!\n");
	}

	Data_Get_Struct(self, hermann_conf_t, conf);

	/*
	 * if the producerConfig is not initialized then we never properly called
	 * producer_push_single, so why are we ticking?
	 */
	if (!conf->isInitialized) {
		rb_raise(rb_eRuntimeError, "Cannot call `tick` without having ever sent a message\n");
	}

	events = rd_kafka_poll(conf->rk, timeout_ms);

	if (conf->isErrored) {
		rb_raise(rb_eStandardError, "%s", conf->error);
	}

	return rb_int_new(events);
}