Class: Kafka::Fetcher

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/fetcher.rb

Constant Summary collapse

MAX_QUEUE_SIZE =
100

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, logger:, instrumenter:) ⇒ Fetcher

Returns a new instance of Fetcher.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/kafka/fetcher.rb', line 11

def initialize(cluster:, logger:, instrumenter:)
  @cluster = cluster
  @logger = logger
  @instrumenter = instrumenter

  @queue = Queue.new
  @commands = Queue.new
  @next_offsets = Hash.new { |h, k| h[k] = {} }

  # Long poll until at least this many bytes can be fetched.
  @min_bytes = 1

  # Long poll at most this number of seconds.
  @max_wait_time = 1

  # The maximum number of bytes to fetch for any given fetch request.
  @max_bytes = 10485760

  # The maximum number of bytes to fetch per partition, by topic.
  @max_bytes_per_partition = {}

  @thread = Thread.new do
    loop while true
  end

  @thread.abort_on_exception = true
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



9
10
11
# File 'lib/kafka/fetcher.rb', line 9

def queue
  @queue
end

Instance Method Details

#configure(min_bytes:, max_bytes:, max_wait_time:) ⇒ Object



47
48
49
# File 'lib/kafka/fetcher.rb', line 47

def configure(min_bytes:, max_bytes:, max_wait_time:)
  @commands << [:configure, [min_bytes, max_bytes, max_wait_time]]
end

#data?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/kafka/fetcher.rb', line 69

def data?
  !@queue.empty?
end

#handle_startObject



55
56
57
58
59
# File 'lib/kafka/fetcher.rb', line 55

def handle_start
  raise "already started" if @running

  @running = true
end

#pollObject



73
74
75
# File 'lib/kafka/fetcher.rb', line 73

def poll
  @queue.deq
end

#resetObject



65
66
67
# File 'lib/kafka/fetcher.rb', line 65

def reset
  @commands << [:reset, []]
end

#seek(topic, partition, offset) ⇒ Object



43
44
45
# File 'lib/kafka/fetcher.rb', line 43

def seek(topic, partition, offset)
  @commands << [:seek, [topic, partition, offset]]
end

#startObject



51
52
53
# File 'lib/kafka/fetcher.rb', line 51

def start
  @commands << [:start, []]
end

#stopObject



61
62
63
# File 'lib/kafka/fetcher.rb', line 61

def stop
  @commands << [:stop, []]
end

#subscribe(topic, max_bytes_per_partition:) ⇒ Object



39
40
41
# File 'lib/kafka/fetcher.rb', line 39

def subscribe(topic, max_bytes_per_partition:)
  @commands << [:subscribe, [topic, max_bytes_per_partition]]
end