Class: Kafka::Fetcher

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/fetcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:) ⇒ Fetcher

Returns a new instance of Fetcher.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/kafka/fetcher.rb', line 9

def initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:)
  @cluster = cluster
  @logger = logger
  @instrumenter = instrumenter
  @max_queue_size = max_queue_size
  @group = group

  @queue = Queue.new
  @commands = Queue.new
  @next_offsets = Hash.new { |h, k| h[k] = {} }

  # Long poll until at least this many bytes can be fetched.
  @min_bytes = 1

  # Long poll at most this number of seconds.
  @max_wait_time = 1

  # The maximum number of bytes to fetch for any given fetch request.
  @max_bytes = 10485760

  # The maximum number of bytes to fetch per partition, by topic.
  @max_bytes_per_partition = {}

  @thread = Thread.new do
    loop while true
  end

  @thread.abort_on_exception = true
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/kafka/fetcher.rb', line 7

def queue
  @queue
end

Instance Method Details

#configure(min_bytes:, max_bytes:, max_wait_time:) ⇒ Object



47
48
49
# File 'lib/kafka/fetcher.rb', line 47

def configure(min_bytes:, max_bytes:, max_wait_time:)
  @commands << [:configure, [min_bytes, max_bytes, max_wait_time]]
end

#data?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/kafka/fetcher.rb', line 69

def data?
  !@queue.empty?
end

#handle_startObject



55
56
57
58
59
# File 'lib/kafka/fetcher.rb', line 55

def handle_start
  raise "already started" if @running

  @running = true
end

#pollObject



73
74
75
# File 'lib/kafka/fetcher.rb', line 73

def poll
  @queue.deq
end

#resetObject



65
66
67
# File 'lib/kafka/fetcher.rb', line 65

def reset
  @commands << [:reset, []]
end

#seek(topic, partition, offset) ⇒ Object



43
44
45
# File 'lib/kafka/fetcher.rb', line 43

def seek(topic, partition, offset)
  @commands << [:seek, [topic, partition, offset]]
end

#startObject



51
52
53
# File 'lib/kafka/fetcher.rb', line 51

def start
  @commands << [:start, []]
end

#stopObject



61
62
63
# File 'lib/kafka/fetcher.rb', line 61

def stop
  @commands << [:stop, []]
end

#subscribe(topic, max_bytes_per_partition:) ⇒ Object



39
40
41
# File 'lib/kafka/fetcher.rb', line 39

def subscribe(topic, max_bytes_per_partition:)
  @commands << [:subscribe, [topic, max_bytes_per_partition]]
end