Class: Hermann::Provider::RDKafka::Producer
- Inherits:
-
Object
- Object
- Hermann::Provider::RDKafka::Producer
- Defined in:
- ext/hermann/hermann_rdkafka.c
Instance Method Summary collapse
-
#connect(timeout) ⇒ Object
Producer.connect.
-
#connected? ⇒ Boolean
Producer.connected?.
-
#errored? ⇒ Boolean
Producer.errored?.
-
#initialize(brokers) ⇒ Object
constructor
producer_initialize.
-
#initialize_copy(orig) ⇒ Object
producer_init_copy.
-
#metadata(topicStr, timeout) ⇒ Object
producer_metadata.
-
#push_single(message, topic, result) ⇒ Object
producer_push_single.
-
#tick(timeout) ⇒ Object
producer_tick.
Constructor Details
#initialize(brokers) ⇒ Object
producer_initialize
Set up the configuration context for the Producer instance
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 |
# File 'ext/hermann/hermann_rdkafka.c', line 1148
static VALUE producer_initialize(VALUE self,
VALUE brokers) {
HermannInstanceConfig* producerConfig;
char* topicPtr;
char* brokersPtr;
TRACER("initialize Producer ruby object\n");
brokersPtr = StringValuePtr(brokers);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
producerConfig->brokers = brokersPtr;
/** Using RD_KAFKA_PARTITION_UA specifies we want the partitioner callback to be called to determine the target
* partition
*/
producerConfig->partition = RD_KAFKA_PARTITION_UA;
producerConfig->run = 1;
producerConfig->exit_eof = 0;
producerConfig->quiet = 0;
return self;
}
|
Instance Method Details
#connect(timeout) ⇒ Object
Producer.connect
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 |
# File 'ext/hermann/hermann_rdkafka.c', line 745
static VALUE producer_connect(VALUE self, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
VALUE result = Qfalse;
hermann_metadata_ctx_t md_context;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
md_context.rk = producerConfig->rk;
md_context.topic = NULL;
md_context.data = NULL;
md_context.timeout_ms = rb_num2int(timeout);
err = producer_metadata_request(&md_context);
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);
if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
TRACER("brokers: %i, topics: %i\n",
md_context.data->broker_cnt,
md_context.data->topic_cnt);
producerConfig->isConnected = 1;
result = Qtrue;
}
else {
producerConfig->isErrored = err;
}
if ( md_context.data )
rd_kafka_metadata_destroy(md_context.data);
return result;
}
|
#connected? ⇒ Boolean
Producer.connected?
889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 |
# File 'ext/hermann/hermann_rdkafka.c', line 889
static VALUE producer_is_connected(VALUE self) {
HermannInstanceConfig *producerConfig;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
return Qfalse;
}
if (!producerConfig->isConnected) {
return Qfalse;
}
return Qtrue;
}
|
#errored? ⇒ Boolean
Producer.errored?
905 906 907 908 909 910 911 912 913 914 915 |
# File 'ext/hermann/hermann_rdkafka.c', line 905
static VALUE producer_is_errored(VALUE self) {
HermannInstanceConfig *producerConfig;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (producerConfig->isErrored) {
return Qtrue;
}
return Qfalse;
}
|
#initialize_copy(orig) ⇒ Object
producer_init_copy
Copy the configuration information from orig into copy for the given Producer instances.
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 |
# File 'ext/hermann/hermann_rdkafka.c', line 1180
static VALUE producer_init_copy(VALUE copy,
VALUE orig) {
HermannInstanceConfig* orig_config;
HermannInstanceConfig* copy_config;
if (copy == orig) {
return copy;
}
if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)producer_free) {
rb_raise(rb_eTypeError, "wrong argument type");
}
Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
Data_Get_Struct(copy, HermannInstanceConfig, copy_config);
// Copy over the data from one struct to the other
MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);
return copy;
}
|
#metadata(topicStr, timeout) ⇒ Object
producer_metadata
make a metadata request to the kafka server, returning a hash containing a list of brokers and topics.
854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 |
# File 'ext/hermann/hermann_rdkafka.c', line 854
static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
hermann_metadata_ctx_t md_context;
VALUE result;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
md_context.rk = producerConfig->rk;
md_context.timeout_ms = rb_num2int(timeout);
if ( !NIL_P(topicStr) ) {
Check_Type(topicStr, T_STRING);
md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
} else {
md_context.topic = NULL;
}
err = producer_metadata_request(&md_context);
if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
} else {
result = producer_metadata_make_hash(md_context.data);
rd_kafka_metadata_destroy(md_context.data);
return result;
}
}
|
#push_single(message, topic, result) ⇒ Object
producer_push_single
push completes
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 |
# File 'ext/hermann/hermann_rdkafka.c', line 595
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
HermannInstanceConfig* producerConfig;
/* Context pointer, pointing to `result`, for the librdkafka delivery
* callback
*/
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
rd_kafka_topic_t *rkt = NULL;
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
delivery_ctx->producer = producerConfig;
delivery_ctx->result = (VALUE) NULL;
TRACER("producerConfig: %p\n", producerConfig);
if ((Qnil == topic) ||
(0 == RSTRING_LEN(topic))) {
rb_raise(rb_eArgError, "Topic cannot be empty");
return self;
}
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
TRACER("kafka initialized\n");
rkt = rd_kafka_topic_new(producerConfig->rk,
RSTRING_PTR(topic),
NULL);
if (NULL == rkt) {
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
return self;
}
/* Only pass result through if it's non-nil */
if (Qnil != result) {
delivery_ctx->result = result;
TRACER("setting result: %p\n", result);
}
TRACER("rd_kafka_produce() message of %i bytes\n", RSTRING_LEN(message));
/* Send/Produce message. */
if (-1 == rd_kafka_produce(rkt,
producerConfig->partition,
RD_KAFKA_MSG_F_COPY,
RSTRING_PTR(message),
RSTRING_LEN(message),
NULL,
0,
delivery_ctx)) {
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
rd_kafka_err2str(rd_kafka_errno2err(errno)));
/* TODO: raise a Ruby exception here, requires a test though */
}
if (NULL != rkt) {
rd_kafka_topic_destroy(rkt);
}
TRACER("returning\n");
return self;
}
|
#tick(timeout) ⇒ Object
producer_tick
This function is responsible for ticking the librdkafka reactor so we can get feedback from the librdkafka threads back into the Ruby environment
@param self VALUE the Ruby producer instance
@param message VALUE A Ruby FixNum of how many ms we should wait on librdkafka
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 |
# File 'ext/hermann/hermann_rdkafka.c', line 675
static VALUE producer_tick(VALUE self, VALUE timeout) {
hermann_conf_t *conf = NULL;
long timeout_ms = 0;
int events = 0;
if (Qnil != timeout) {
timeout_ms = rb_num2int(timeout);
}
else {
rb_raise(rb_eArgError, "Cannot call `tick` with a nil timeout!\n");
}
Data_Get_Struct(self, hermann_conf_t, conf);
/*
* if the producerConfig is not initialized then we never properly called
* producer_push_single, so why are we ticking?
*/
if (!conf->isInitialized) {
rb_raise(rb_eRuntimeError, "Cannot call `tick` without having ever sent a message\n");
}
events = rd_kafka_poll(conf->rk, timeout_ms);
if (conf->isErrored) {
rb_raise(rb_eStandardError, "%s", conf->error);
}
return rb_int_new(events);
}
|