Class: Kafka::Fetcher

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/fetcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, logger:, instrumenter:, max_queue_size:) ⇒ Fetcher

Returns a new instance of Fetcher.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/kafka/fetcher.rb', line 9

def initialize(cluster:, logger:, instrumenter:, max_queue_size:)
  @cluster = cluster
  @logger = logger
  @instrumenter = instrumenter
  @max_queue_size = max_queue_size

  @queue = Queue.new
  @commands = Queue.new
  @next_offsets = Hash.new { |h, k| h[k] = {} }

  # Long poll until at least this many bytes can be fetched.
  @min_bytes = 1

  # Long poll at most this number of seconds.
  @max_wait_time = 1

  # The maximum number of bytes to fetch for any given fetch request.
  @max_bytes = 10485760

  # The maximum number of bytes to fetch per partition, by topic.
  @max_bytes_per_partition = {}

  @thread = Thread.new do
    loop while true
  end

  @thread.abort_on_exception = true
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/kafka/fetcher.rb', line 7

def queue
  @queue
end

Instance Method Details

#configure(min_bytes:, max_bytes:, max_wait_time:) ⇒ Object



46
47
48
# File 'lib/kafka/fetcher.rb', line 46

def configure(min_bytes:, max_bytes:, max_wait_time:)
  @commands << [:configure, [min_bytes, max_bytes, max_wait_time]]
end

#data?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/kafka/fetcher.rb', line 68

def data?
  !@queue.empty?
end

#handle_startObject



54
55
56
57
58
# File 'lib/kafka/fetcher.rb', line 54

def handle_start
  raise "already started" if @running

  @running = true
end

#pollObject



72
73
74
# File 'lib/kafka/fetcher.rb', line 72

def poll
  @queue.deq
end

#resetObject



64
65
66
# File 'lib/kafka/fetcher.rb', line 64

def reset
  @commands << [:reset, []]
end

#seek(topic, partition, offset) ⇒ Object



42
43
44
# File 'lib/kafka/fetcher.rb', line 42

def seek(topic, partition, offset)
  @commands << [:seek, [topic, partition, offset]]
end

#startObject



50
51
52
# File 'lib/kafka/fetcher.rb', line 50

def start
  @commands << [:start, []]
end

#stopObject



60
61
62
# File 'lib/kafka/fetcher.rb', line 60

def stop
  @commands << [:stop, []]
end

#subscribe(topic, max_bytes_per_partition:) ⇒ Object



38
39
40
# File 'lib/kafka/fetcher.rb', line 38

def subscribe(topic, max_bytes_per_partition:)
  @commands << [:subscribe, [topic, max_bytes_per_partition]]
end