Class: Kafka::Fetcher

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/fetcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:) ⇒ Fetcher

Returns a new instance of Fetcher


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/kafka/fetcher.rb', line 9

def initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:)
  @cluster = cluster
  @logger = TaggedLogger.new(logger)
  @instrumenter = instrumenter
  @max_queue_size = max_queue_size
  @group = group

  @queue = Queue.new
  @commands = Queue.new
  @next_offsets = Hash.new { |h, k| h[k] = {} }

  # Long poll until at least this many bytes can be fetched.
  @min_bytes = 1

  # Long poll at most this number of seconds.
  @max_wait_time = 1

  # The maximum number of bytes to fetch for any given fetch request.
  @max_bytes = 10485760

  # The maximum number of bytes to fetch per partition, by topic.
  @max_bytes_per_partition = {}

  # An incrementing counter used to synchronize resets between the
  # foreground and background thread.
  @current_reset_counter = 0
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue


7
8
9
# File 'lib/kafka/fetcher.rb', line 7

def queue
  @queue
end

Instance Method Details

#configure(min_bytes:, max_bytes:, max_wait_time:) ⇒ Object


45
46
47
# File 'lib/kafka/fetcher.rb', line 45

def configure(min_bytes:, max_bytes:, max_wait_time:)
  @commands << [:configure, [min_bytes, max_bytes, max_wait_time]]
end

#data?Boolean

Returns:

  • (Boolean)

74
75
76
# File 'lib/kafka/fetcher.rb', line 74

def data?
  !@queue.empty?
end

#pollObject


78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/kafka/fetcher.rb', line 78

def poll
  tag, message, reset_counter = @queue.deq

  # Batches are tagged with the current reset counter value. If the batch
  # has a reset_counter < current_reset_counter, we know it was fetched
  # prior to the most recent reset and should be discarded.
  if tag == :batches && message.any? && current_reset_counter > reset_counter
    @logger.warn "Skipping stale messages buffered prior to reset"
    return tag, []
  end

  return [tag, message]
end

#resetObject


69
70
71
72
# File 'lib/kafka/fetcher.rb', line 69

def reset
  @current_reset_counter = current_reset_counter + 1
  @commands << [:reset]
end

#seek(topic, partition, offset) ⇒ Object


41
42
43
# File 'lib/kafka/fetcher.rb', line 41

def seek(topic, partition, offset)
  @commands << [:seek, [topic, partition, offset]]
end

#startObject


49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/kafka/fetcher.rb', line 49

def start
  return if @running

  @running = true

  @thread = Thread.new do
    while @running
      loop
    end
    @logger.info "#{@group} Fetcher thread exited."
  end
  @thread.abort_on_exception = true
end

#stopObject


63
64
65
66
67
# File 'lib/kafka/fetcher.rb', line 63

def stop
  return unless @running
  @commands << [:stop, []]
  @thread.join
end

#subscribe(topic, max_bytes_per_partition:) ⇒ Object


37
38
39
# File 'lib/kafka/fetcher.rb', line 37

def subscribe(topic, max_bytes_per_partition:)
  @commands << [:subscribe, [topic, max_bytes_per_partition]]
end