Class: Kafka::Fetcher
- Inherits:
-
Object
- Object
- Kafka::Fetcher
- Defined in:
- lib/kafka/fetcher.rb
Constant Summary collapse
- MAX_QUEUE_SIZE =
100
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #configure(min_bytes:, max_bytes:, max_wait_time:) ⇒ Object
- #data? ⇒ Boolean
- #handle_start ⇒ Object
-
#initialize(cluster:, logger:, instrumenter:) ⇒ Fetcher
constructor
A new instance of Fetcher.
- #poll ⇒ Object
- #reset ⇒ Object
- #seek(topic, partition, offset) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #subscribe(topic, max_bytes_per_partition:) ⇒ Object
Constructor Details
#initialize(cluster:, logger:, instrumenter:) ⇒ Fetcher
Returns a new instance of Fetcher.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/kafka/fetcher.rb', line 11 def initialize(cluster:, logger:, instrumenter:) @cluster = cluster @logger = logger @instrumenter = instrumenter @queue = Queue.new @commands = Queue.new @next_offsets = Hash.new { |h, k| h[k] = {} } # Long poll until at least this many bytes can be fetched. @min_bytes = 1 # Long poll at most this number of seconds. @max_wait_time = 1 # The maximum number of bytes to fetch for any given fetch request. @max_bytes = 10485760 # The maximum number of bytes to fetch per partition, by topic. @max_bytes_per_partition = {} @thread = Thread.new do loop while true end @thread.abort_on_exception = true end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
9 10 11 |
# File 'lib/kafka/fetcher.rb', line 9 def queue @queue end |
Instance Method Details
#configure(min_bytes:, max_bytes:, max_wait_time:) ⇒ Object
47 48 49 |
# File 'lib/kafka/fetcher.rb', line 47 def configure(min_bytes:, max_bytes:, max_wait_time:) @commands << [:configure, [min_bytes, max_bytes, max_wait_time]] end |
#data? ⇒ Boolean
69 70 71 |
# File 'lib/kafka/fetcher.rb', line 69 def data? !@queue.empty? end |
#handle_start ⇒ Object
55 56 57 58 59 |
# File 'lib/kafka/fetcher.rb', line 55 def handle_start raise "already started" if @running @running = true end |
#poll ⇒ Object
73 74 75 |
# File 'lib/kafka/fetcher.rb', line 73 def poll @queue.deq end |
#reset ⇒ Object
65 66 67 |
# File 'lib/kafka/fetcher.rb', line 65 def reset @commands << [:reset, []] end |
#seek(topic, partition, offset) ⇒ Object
43 44 45 |
# File 'lib/kafka/fetcher.rb', line 43 def seek(topic, partition, offset) @commands << [:seek, [topic, partition, offset]] end |
#start ⇒ Object
51 52 53 |
# File 'lib/kafka/fetcher.rb', line 51 def start @commands << [:start, []] end |
#stop ⇒ Object
61 62 63 |
# File 'lib/kafka/fetcher.rb', line 61 def stop @commands << [:stop, []] end |
#subscribe(topic, max_bytes_per_partition:) ⇒ Object
39 40 41 |
# File 'lib/kafka/fetcher.rb', line 39 def subscribe(topic, max_bytes_per_partition:) @commands << [:subscribe, [topic, max_bytes_per_partition]] end |