Class: Heller::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/heller/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(connect_string, options = {}) ⇒ Consumer

Returns a new instance of Consumer.



8
9
10
11
12
13
14
# File 'lib/heller/consumer.rb', line 8

def initialize(connect_string, options = {})
  @host, @port = connect_string.split(':')
  options   = defaults.merge(options)
  @consumer = create_consumer(options)
  @build_options = options.select { |k, _| BUILD_OPTIONS.include?(k) }
  @decoder  = Kafka::Serializer::StringDecoder.new(nil)
end

Instance Method Details

#client_idObject



16
17
18
# File 'lib/heller/consumer.rb', line 16

def client_id
  @consumer.client_id
end

#disconnectObject Also known as: close



57
58
59
# File 'lib/heller/consumer.rb', line 57

def disconnect
  @consumer.close
end

#earliest_offset(topic, partition) ⇒ Object



47
48
49
50
# File 'lib/heller/consumer.rb', line 47

def earliest_offset(topic, partition)
  response = offsets_before(OffsetRequest.new(topic, partition, OffsetRequest.earliest_time))
  response.offsets(topic, partition).first
end

#fetch(fetch_requests, fetch_size = DEFAULT_FETCH_SIZE) ⇒ Object



20
21
22
23
24
25
26
27
# File 'lib/heller/consumer.rb', line 20

def fetch(fetch_requests, fetch_size = DEFAULT_FETCH_SIZE)
  builder = create_builder(@build_options)
  Array(fetch_requests).each do |request|
    builder.add_fetch(request.topic, request.partition, request.offset, fetch_size)
  end
  raw_response = @consumer.fetch(builder.build)
  FetchResponse.new(raw_response, @decoder)
end

#latest_offset(topic, partition) ⇒ Object



52
53
54
55
# File 'lib/heller/consumer.rb', line 52

def latest_offset(topic, partition)
  response = offsets_before(OffsetRequest.new(topic, partition, OffsetRequest.latest_time))
  response.offsets(topic, partition).last
end

#metadata(topics = []) ⇒ Object Also known as: topic_metadata



29
30
31
32
# File 'lib/heller/consumer.rb', line 29

def (topics=[])
  request = Kafka::JavaApi::TopicMetadataRequest.new(topics)
  TopicMetadataResponse.new(@consumer.send(request))
end

#offsets_before(offset_requests) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/heller/consumer.rb', line 35

def offsets_before(offset_requests)
  request_info = Array(offset_requests).each_with_object({}) do |request, memo|
    topic_partition = Kafka::Common::TopicAndPartition.new(request.topic, request.partition)
    partition_offset = Kafka::Api::PartitionOffsetRequestInfo.new(request.time.to_i, request.max_offsets)

    memo[topic_partition] = partition_offset
  end

  request = Kafka::JavaApi::OffsetRequest.new(request_info, OffsetRequest.current_version, client_id)
  OffsetResponse.new(@consumer.get_offsets_before(request))
end