Class: Karafka::Pro::Iterator

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/iterator.rb,
lib/karafka/pro/iterator/expander.rb,
lib/karafka/pro/iterator/tpl_builder.rb

Overview

Topic iterator allows you to iterate over topic/partition data and perform lookups for information that you need.

It supports early stops on finding the requested data and allows for seeking till the end. It also allows for signaling, when a given message should be last out of certain partition, but we still want to continue iterating in other messages.

It does not create a consumer group and does not have any offset management.

Defined Under Namespace

Classes: Expander, TplBuilder

Instance Method Summary collapse

Constructor Details

#initialize(topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200) ⇒ Iterator

Note:

It is worth keeping in mind, that this API also needs to operate within ‘max.poll.interval.ms` limitations on each iteration

Note:

In case of a never-ending iterator, you need to set ‘enable.partition.eof` to `false` so we don’t stop polling data even when reaching the end (end on a given moment)

A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances. This API allows for multi-partition streaming and is optimized for data lookups. It allows for explicit stopping iteration over any partition during the iteration process, allowing for optimized lookups.

Parameters:

  • topics (Array<String>, Hash)

    list of strings if we want to subscribe to multiple topics and all of their partitions or a hash where keys are the topics and values are hashes with partitions and their initial offsets.

  • settings (Hash) (defaults to: { 'auto.offset.reset': 'beginning' })

    extra settings for the consumer. Please keep in mind, that if overwritten, you may want to include ‘auto.offset.reset` to match your case.

  • yield_nil (Boolean) (defaults to: false)

    should we yield also ‘nil` values when poll returns nothing. Useful in particular for long-living iterators.

  • max_wait_time (Integer) (defaults to: 200)

    max wait in ms when iterator did not receive any messages



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/pro/iterator.rb', line 49

def initialize(
  topics,
  settings: { 'auto.offset.reset': 'beginning' },
  yield_nil: false,
  max_wait_time: 200
)
  @topics_with_partitions = Expander.new.call(topics)

  @routing_topics = @topics_with_partitions.map do |name, _|
    [name, ::Karafka::Routing::Router.find_or_initialize_by_name(name)]
  end.to_h

  @total_partitions = @topics_with_partitions.map(&:last).sum(&:count)

  @stopped_partitions = 0

  @settings = settings
  @yield_nil = yield_nil
  @max_wait_time = max_wait_time
end

Instance Method Details

#eachObject

Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run ‘stop_partition` to stop iterating over part of data. It is useful for scenarios where we are looking for some information in all the partitions but once we found it, given partition data is no longer needed and would only eat up resources.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/karafka/pro/iterator.rb', line 75

def each
  Admin.with_consumer(@settings) do |consumer|
    tpl = TplBuilder.new(consumer, @topics_with_partitions).call
    consumer.assign(tpl)

    # We need this for self-referenced APIs like pausing
    @current_consumer = consumer

    # Stream data until we reach the end of all the partitions or until the end user
    # indicates that they are done
    until done?
      message = poll

      # Skip nils if not explicitly required
      next if message.nil? && !@yield_nil

      if message
        @current_message = build_message(message)

        yield(@current_message, self)
      else
        yield(nil, self)
      end
    end

    @current_message = nil
    @current_consumer = nil
  end

  # Reset so we can use the same iterator again if needed
  @stopped_partitions = 0
end

#stop_current_partitionObject

Stops the partition we’re currently yielded into



109
110
111
112
113
114
# File 'lib/karafka/pro/iterator.rb', line 109

def stop_current_partition
  stop_partition(
    @current_message.topic,
    @current_message.partition
  )
end

#stop_partition(name, partition) ⇒ Object

Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.

We pause it forever and no longer work with it.

Parameters:

  • name (String)

    topic name of which partition we want to stop

  • partition (Integer)

    partition we want to stop processing



125
126
127
128
129
130
131
132
133
# File 'lib/karafka/pro/iterator.rb', line 125

def stop_partition(name, partition)
  @stopped_partitions += 1

  @current_consumer.pause(
    Rdkafka::Consumer::TopicPartitionList.new(
      name => [Partition.new(partition, 0)]
    )
  )
end