Class: Hermann::Consumer

Inherits:
Object
  • Object
show all
Defined in:
ext/hermann_lib.c

Instance Method Summary collapse

Constructor Details

#initialize(topic, brokers, partition) ⇒ Object

consumer_initialize

todo: configure the brokers through passed parameter, later through zk

Set up the Consumer’s HermannInstanceConfig context.

Parameters:

  • self

    VALUE the Ruby instance of the Consumer

  • topic

    VALUE a Ruby string

  • brokers

    VALUE a Ruby string containing list of host:port

  • partition

    VALUE a Ruby number



661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
# File 'ext/hermann_lib.c', line 661

static VALUE consumer_initialize(VALUE self, VALUE topic, VALUE brokers, VALUE partition) {

    HermannInstanceConfig* consumerConfig;
    char* topicPtr;
    char* brokersPtr;
    int partitionNo;

#ifdef TRACE
    fprintf(stderr, "consumer_initialize\n");
#endif

    topicPtr = StringValuePtr(topic);
    brokersPtr = StringValuePtr(brokers);
    partitionNo = FIX2INT(partition);
    Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);

    consumerConfig->topic = topicPtr;
    consumerConfig->brokers = brokersPtr;
    consumerConfig->partition = partitionNo;
    consumerConfig->run = 1;
    consumerConfig->exit_eof = 0;
    consumerConfig->quiet = 0;

    return self;
}

Instance Method Details

#consumeObject

Hermann::Consumer.consume

Begin listening on the configured topic for messages. msg_consume will be called on each message received.

Parameters:

  • VALUE

    self the Ruby object for this consumer



359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# File 'ext/hermann_lib.c', line 359

static VALUE consumer_consume(VALUE self) {

    HermannInstanceConfig* consumerConfig;

#ifdef TRACE
    fprintf(stderr, "consumer_consume");
#endif

    Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);

    if(consumerConfig->topic==NULL) {
            fprintf(stderr, "Topic is null!");
            return;
    }

    if(!consumerConfig->isInitialized) {
        consumer_init_kafka(consumerConfig);
    }

    /* Start consuming */
    if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1){
        fprintf(stderr, "%% Failed to start consuming: %s\n",
            rd_kafka_err2str(rd_kafka_errno2err(errno)));
        exit(1);
    }

#ifdef RB_THREAD_BLOCKING_REGION
    /** The consumer will listen for incoming messages in a loop, timing out and checking the consumerConfig->run
     *  flag every second.
     *
     *  Call rb_thread_blocking_region to release the GVM lock and allow Ruby to amuse itself while we wait on
     *  IO from Kafka.
     *
     *  If Ruby needs to interrupt the consumer loop, the stop callback will be invoked and the loop should exit.
     */
    rb_thread_blocking_region(consumer_consume_loop, consumerConfig, consumer_consume_stop_callback, consumerConfig);
#else
    consumer_consume_loop(consumerConfig);
#endif


    /* Stop consuming */
    rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);

    return Qnil;
}

#initialize_copy(orig) ⇒ Object

consumer_init_copy

When copying into a new instance of a Consumer, reproduce the configuration info.

Parameters:

  • copy

    VALUE the Ruby Consumer instance (with configuration) as destination

  • orig

    VALUE the Ruby Consumer instance (with configuration) as source



696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
# File 'ext/hermann_lib.c', line 696

static VALUE consumer_init_copy(VALUE copy, VALUE orig) {
    HermannInstanceConfig* orig_config;
    HermannInstanceConfig* copy_config;

#ifdef TRACE
    fprintf(stderr, "consumer_init_copy\n");
#endif

    if(copy == orig) {
        return copy;
    }

    if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)consumer_free) {
        rb_raise(rb_eTypeError, "wrong argument type");
    }

    Data_Get_Struct(orig, HermannInstanceConfig, orig_config);
    Data_Get_Struct(copy, HermannInstanceConfig, copy_config);

    // Copy over the data from one struct to the other
    MEMCPY(copy_config, orig_config, HermannInstanceConfig, 1);

    return copy;
}