Class: Heller::Consumer
- Inherits:
-
Object
- Object
- Heller::Consumer
- Defined in:
- lib/heller/consumer.rb
Instance Method Summary collapse
- #client_id ⇒ Object
- #disconnect ⇒ Object (also: #close)
- #earliest_offset(topic, partition) ⇒ Object
- #fetch(fetch_requests, fetch_size = DEFAULT_FETCH_SIZE) ⇒ Object
-
#initialize(connect_string, options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
- #latest_offset(topic, partition) ⇒ Object
- #metadata(topics = []) ⇒ Object (also: #topic_metadata)
- #offsets_before(offset_requests) ⇒ Object
Constructor Details
#initialize(connect_string, options = {}) ⇒ Consumer
Returns a new instance of Consumer.
8 9 10 11 12 13 14 |
# File 'lib/heller/consumer.rb', line 8 def initialize(connect_string, = {}) @host, @port = connect_string.split(':') = defaults.merge() @consumer = create_consumer() @build_options = .select { |k, _| BUILD_OPTIONS.include?(k) } @decoder = Kafka::Serializer::StringDecoder.new(nil) end |
Instance Method Details
#client_id ⇒ Object
16 17 18 |
# File 'lib/heller/consumer.rb', line 16 def client_id @consumer.client_id end |
#disconnect ⇒ Object Also known as: close
57 58 59 |
# File 'lib/heller/consumer.rb', line 57 def disconnect @consumer.close end |
#earliest_offset(topic, partition) ⇒ Object
47 48 49 50 |
# File 'lib/heller/consumer.rb', line 47 def earliest_offset(topic, partition) response = offsets_before(OffsetRequest.new(topic, partition, OffsetRequest.earliest_time)) response.offsets(topic, partition).first end |
#fetch(fetch_requests, fetch_size = DEFAULT_FETCH_SIZE) ⇒ Object
20 21 22 23 24 25 26 27 |
# File 'lib/heller/consumer.rb', line 20 def fetch(fetch_requests, fetch_size = DEFAULT_FETCH_SIZE) builder = create_builder(@build_options) Array(fetch_requests).each do |request| builder.add_fetch(request.topic, request.partition, request.offset, fetch_size) end raw_response = @consumer.fetch(builder.build) FetchResponse.new(raw_response, @decoder) end |
#latest_offset(topic, partition) ⇒ Object
52 53 54 55 |
# File 'lib/heller/consumer.rb', line 52 def latest_offset(topic, partition) response = offsets_before(OffsetRequest.new(topic, partition, OffsetRequest.latest_time)) response.offsets(topic, partition).last end |
#metadata(topics = []) ⇒ Object Also known as: topic_metadata
29 30 31 32 |
# File 'lib/heller/consumer.rb', line 29 def (topics=[]) request = Kafka::JavaApi::TopicMetadataRequest.new(topics) TopicMetadataResponse.new(@consumer.send(request)) end |
#offsets_before(offset_requests) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/heller/consumer.rb', line 35 def offsets_before(offset_requests) request_info = Array(offset_requests).each_with_object({}) do |request, memo| topic_partition = Kafka::Common::TopicAndPartition.new(request.topic, request.partition) partition_offset = Kafka::Api::PartitionOffsetRequestInfo.new(request.time.to_i, request.max_offsets) memo[topic_partition] = partition_offset end request = Kafka::JavaApi::OffsetRequest.new(request_info, OffsetRequest.current_version, client_id) OffsetResponse.new(@consumer.get_offsets_before(request)) end |