Class: Hermann::Lib::Producer
- Inherits:
-
Object
- Object
- Hermann::Lib::Producer
- Defined in:
- ext/hermann/hermann_lib.c
Instance Method Summary collapse
-
#connect(timeout) ⇒ Object
Producer.connect.
-
#connected? ⇒ Boolean
Producer.connected?.
-
#errored? ⇒ Boolean
Producer.errored?.
-
#initialize(brokers) ⇒ Object
constructor
producer_initialize.
-
#initialize_copy(orig) ⇒ Object
producer_init_copy.
-
#push_single(message, topic, result) ⇒ Object
producer_push_single.
-
#tick(timeout) ⇒ Object
producer_tick.
Constructor Details
#initialize(brokers) ⇒ Object
producer_initialize
Set up the configuration context for the Producer instance
971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 |
# File 'ext/hermann/hermann_lib.c', line 971
static VALUE producer_initialize(VALUE self,
VALUE brokers) {
HermannInstanceConfig* producerConfig;
char* topicPtr;
char* brokersPtr;
TRACER("initialize Producer ruby object\n");
brokersPtr = StringValuePtr(brokers);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
producerConfig->brokers = brokersPtr;
/** Using RD_KAFKA_PARTITION_UA specifies we want the partitioner callback to be called to determine the target
* partition
*/
producerConfig->partition = RD_KAFKA_PARTITION_UA;
producerConfig->run = 1;
producerConfig->exit_eof = 0;
producerConfig->quiet = 0;
return self;
}
|
Instance Method Details
#connect(timeout) ⇒ Object
Producer.connect
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 |
# File 'ext/hermann/hermann_lib.c', line 676
static VALUE producer_connect(VALUE self, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
VALUE result = Qfalse;
int timeout_ms = rb_num2int(timeout);
struct rd_kafka_metadata *data = NULL;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
err = rd_kafka_metadata(producerConfig->rk,
0,
producerConfig->rkt,
&data,
timeout_ms);
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);
if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
TRACER("brokers: %i, topics: %i\n",
data->broker_cnt,
data->topic_cnt);
producerConfig->isConnected = 1;
result = Qtrue;
}
else {
producerConfig->isErrored = err;
}
rd_kafka_metadata_destroy(data);
return result;
}
|
#connected? ⇒ Boolean
Producer.connected?
712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 |
# File 'ext/hermann/hermann_lib.c', line 712
static VALUE producer_is_connected(VALUE self) {
HermannInstanceConfig *producerConfig;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
return Qfalse;
}
if (!producerConfig->isConnected) {
return Qfalse;
}
return Qtrue;
}
|
#errored? ⇒ Boolean
Producer.errored?
728 729 730 731 732 733 734 735 736 737 738 |
# File 'ext/hermann/hermann_lib.c', line 728
static VALUE producer_is_errored(VALUE self) {
HermannInstanceConfig *producerConfig;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (producerConfig->isErrored) {
return Qtrue;
}
return Qfalse;
}
|
#initialize_copy(orig) ⇒ Object
producer_init_copy
Copy the configuration information from orig into copy for the given Producer instances.
1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 |
# File 'ext/hermann/hermann_lib.c', line 1003
static VALUE producer_init_copy(VALUE copy,
VALUE orig) {
HermannInstanceConfig* orig_config;
HermannInstanceConfig* copy_config;
if (copy == orig) {
return copy;
}
if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)producer_free) {
rb_raise(rb_eTypeError, "wrong argument type");
}
Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
Data_Get_Struct(copy, HermannInstanceConfig, copy_config);
// Copy over the data from one struct to the other
MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);
return copy;
}
|
#push_single(message, topic, result) ⇒ Object
producer_push_single
push completes
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 |
# File 'ext/hermann/hermann_lib.c', line 564
static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE result) {
HermannInstanceConfig* producerConfig;
/* Context pointer, pointing to `result`, for the librdkafka delivery
* callback
*/
hermann_push_ctx_t *delivery_ctx = (hermann_push_ctx_t *)malloc(sizeof(hermann_push_ctx_t));
rd_kafka_topic_t *rkt = NULL;
TRACER("self: %p, message: %p, result: %p)\n", self, message, result);
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
delivery_ctx->producer = producerConfig;
delivery_ctx->result = NULL;
TRACER("producerConfig: %p\n", producerConfig);
if ((Qnil == topic) ||
(0 == RSTRING_LEN(topic))) {
rb_raise(rb_eArgError, "Topic cannot be empty");
return self;
}
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
TRACER("kafka initialized\n");
rkt = rd_kafka_topic_new(producerConfig->rk,
RSTRING_PTR(topic),
NULL);
if (NULL == rkt) {
rb_raise(rb_eRuntimeError, "Could not construct a topic structure");
return self;
}
/* Only pass result through if it's non-nil */
if (Qnil != result) {
delivery_ctx->result = result;
TRACER("setting result: %p\n", result);
}
TRACER("rd_kafka_produce() message of %i bytes\n", RSTRING_LEN(message));
/* Send/Produce message. */
if (-1 == rd_kafka_produce(rkt,
producerConfig->partition,
RD_KAFKA_MSG_F_COPY,
RSTRING_PTR(message),
RSTRING_LEN(message),
NULL,
0,
delivery_ctx)) {
fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n",
rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition,
rd_kafka_err2str(rd_kafka_errno2err(errno)));
/* TODO: raise a Ruby exception here, requires a test though */
}
if (NULL != rkt) {
rd_kafka_topic_destroy(rkt);
}
TRACER("returning\n");
return self;
}
|
#tick(timeout) ⇒ Object
producer_tick
This function is responsible for ticking the librdkafka reactor so we can get feedback from the librdkafka threads back into the Ruby environment
@param self VALUE the Ruby producer instance
@param message VALUE A Ruby FixNum of how many ms we should wait on librdkafka
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 |
# File 'ext/hermann/hermann_lib.c', line 644
static VALUE producer_tick(VALUE self, VALUE timeout) {
hermann_conf_t *conf = NULL;
long timeout_ms = 0;
int events = 0;
if (Qnil != timeout) {
timeout_ms = rb_num2int(timeout);
}
else {
rb_raise(rb_eArgError, "Cannot call `tick` with a nil timeout!\n");
}
Data_Get_Struct(self, hermann_conf_t, conf);
/*
* if the producerConfig is not initialized then we never properly called
* producer_push_single, so why are we ticking?
*/
if (!conf->isInitialized) {
rb_raise(rb_eRuntimeError, "Cannot call `tick` without having ever sent a message\n");
}
events = rd_kafka_poll(conf->rk, timeout_ms);
if (conf->isErrored) {
rb_raise(rb_eStandardError, conf->error);
}
return rb_int_new(events);
}
|