Class: Hermann::Consumer
- Inherits:
-
Object
- Object
- Hermann::Consumer
- Defined in:
- ext/hermann_lib.c
Instance Method Summary collapse
-
#consume ⇒ Object
Hermann::Consumer.consume.
-
#initialize(topic, brokers, partition) ⇒ Object
constructor
consumer_initialize.
-
#initialize_copy(orig) ⇒ Object
consumer_init_copy.
Constructor Details
#initialize(topic, brokers, partition) ⇒ Object
consumer_initialize
todo: configure the brokers through passed parameter, later through zk
Set up the Consumer’s HermannInstanceConfig context.
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 |
# File 'ext/hermann_lib.c', line 661 static VALUE consumer_initialize(VALUE self, VALUE topic, VALUE brokers, VALUE partition) { HermannInstanceConfig* consumerConfig; char* topicPtr; char* brokersPtr; int partitionNo; #ifdef TRACE fprintf(stderr, "consumer_initialize\n"); #endif topicPtr = StringValuePtr(topic); brokersPtr = StringValuePtr(brokers); partitionNo = FIX2INT(partition); Data_Get_Struct(self, HermannInstanceConfig, consumerConfig); consumerConfig->topic = topicPtr; consumerConfig->brokers = brokersPtr; consumerConfig->partition = partitionNo; consumerConfig->run = 1; consumerConfig->exit_eof = 0; consumerConfig->quiet = 0; return self; } |
Instance Method Details
#consume ⇒ Object
Hermann::Consumer.consume
Begin listening on the configured topic for messages. msg_consume will be called on each message received.
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 |
# File 'ext/hermann_lib.c', line 359 static VALUE consumer_consume(VALUE self) { HermannInstanceConfig* consumerConfig; #ifdef TRACE fprintf(stderr, "consumer_consume"); #endif Data_Get_Struct(self, HermannInstanceConfig, consumerConfig); if(consumerConfig->topic==NULL) { fprintf(stderr, "Topic is null!"); return; } if(!consumerConfig->isInitialized) { consumer_init_kafka(consumerConfig); } /* Start consuming */ if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1){ fprintf(stderr, "%% Failed to start consuming: %s\n", rd_kafka_err2str(rd_kafka_errno2err(errno))); exit(1); } #ifdef RB_THREAD_BLOCKING_REGION /** The consumer will listen for incoming messages in a loop, timing out and checking the consumerConfig->run * flag every second. * * Call rb_thread_blocking_region to release the GVM lock and allow Ruby to amuse itself while we wait on * IO from Kafka. * * If Ruby needs to interrupt the consumer loop, the stop callback will be invoked and the loop should exit. */ rb_thread_blocking_region(consumer_consume_loop, consumerConfig, consumer_consume_stop_callback, consumerConfig); #else consumer_consume_loop(consumerConfig); #endif /* Stop consuming */ rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition); return Qnil; } |
#initialize_copy(orig) ⇒ Object
consumer_init_copy
When copying into a new instance of a Consumer, reproduce the configuration info.
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 |
# File 'ext/hermann_lib.c', line 696 static VALUE consumer_init_copy(VALUE copy, VALUE orig) { HermannInstanceConfig* orig_config; HermannInstanceConfig* copy_config; #ifdef TRACE fprintf(stderr, "consumer_init_copy\n"); #endif if(copy == orig) { return copy; } if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)consumer_free) { rb_raise(rb_eTypeError, "wrong argument type"); } Data_Get_Struct(orig, HermannInstanceConfig, orig_config); Data_Get_Struct(copy, HermannInstanceConfig, copy_config); // Copy over the data from one struct to the other MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1); return copy; } |