Class: Kafka::Fetcher

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/fetcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:) ⇒ Fetcher

Returns a new instance of Fetcher.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/kafka/fetcher.rb', line 9

def initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:)
  @cluster = cluster
  @logger = TaggedLogger.new(logger)
  @instrumenter = instrumenter
  @max_queue_size = max_queue_size
  @group = group

  @queue = Queue.new
  @commands = Queue.new
  @next_offsets = Hash.new { |h, k| h[k] = {} }

  # We are only running when someone calls start.
  @running = false

  # Long poll until at least this many bytes can be fetched.
  @min_bytes = 1

  # Long poll at most this number of seconds.
  @max_wait_time = 1

  # The maximum number of bytes to fetch for any given fetch request.
  @max_bytes = 10485760

  # The maximum number of bytes to fetch per partition, by topic.
  @max_bytes_per_partition = {}

  # An incrementing counter used to synchronize resets between the
  # foreground and background thread.
  @current_reset_counter = 0
end

Instance Attribute Details

#max_wait_timeObject (readonly)

Returns the value of attribute max_wait_time.



7
8
9
# File 'lib/kafka/fetcher.rb', line 7

def max_wait_time
  @max_wait_time
end

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/kafka/fetcher.rb', line 7

def queue
  @queue
end

Instance Method Details

#configure(min_bytes:, max_bytes:, max_wait_time:) ⇒ Object



48
49
50
# File 'lib/kafka/fetcher.rb', line 48

def configure(min_bytes:, max_bytes:, max_wait_time:)
  @commands << [:configure, [min_bytes, max_bytes, max_wait_time]]
end

#data?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/kafka/fetcher.rb', line 77

def data?
  !@queue.empty?
end

#pollObject



81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/kafka/fetcher.rb', line 81

def poll
  tag, message, reset_counter = @queue.deq

  # Batches are tagged with the current reset counter value. If the batch
  # has a reset_counter < current_reset_counter, we know it was fetched
  # prior to the most recent reset and should be discarded.
  if tag == :batches && message.any? && current_reset_counter > reset_counter
    @logger.warn "Skipping stale messages buffered prior to reset"
    return tag, []
  end

  return [tag, message]
end

#resetObject



72
73
74
75
# File 'lib/kafka/fetcher.rb', line 72

def reset
  @current_reset_counter = current_reset_counter + 1
  @commands << [:reset]
end

#seek(topic, partition, offset) ⇒ Object



44
45
46
# File 'lib/kafka/fetcher.rb', line 44

def seek(topic, partition, offset)
  @commands << [:seek, [topic, partition, offset]]
end

#startObject



52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/kafka/fetcher.rb', line 52

def start
  return if @running

  @running = true

  @thread = Thread.new do
    while @running
      loop
    end
    @logger.info "#{@group} Fetcher thread exited."
  end
  @thread.abort_on_exception = true
end

#stopObject



66
67
68
69
70
# File 'lib/kafka/fetcher.rb', line 66

def stop
  return unless @running
  @commands << [:stop, []]
  @thread.join
end

#subscribe(topic, max_bytes_per_partition:) ⇒ Object



40
41
42
# File 'lib/kafka/fetcher.rb', line 40

def subscribe(topic, max_bytes_per_partition:)
  @commands << [:subscribe, [topic, max_bytes_per_partition]]
end