Class: Fluent::KafkaInput::OffsetManager

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_kafka.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic_entry, zookeeper, zk_root_node) ⇒ OffsetManager

Returns a new instance of OffsetManager.



360
361
362
363
364
# File 'lib/fluent/plugin/in_kafka.rb', line 360

def initialize(topic_entry, zookeeper, zk_root_node)
  @zookeeper = zookeeper
  @zk_path = "#{zk_root_node}/#{topic_entry.topic}/#{topic_entry.partition}/next_offset"
  create_node(@zk_path, topic_entry.topic, topic_entry.partition)
end

Instance Method Details

#create_node(zk_path, topic, partition) ⇒ Object



366
367
368
369
370
371
372
373
# File 'lib/fluent/plugin/in_kafka.rb', line 366

def create_node(zk_path, topic, partition)
  path = ""
  zk_path.split(/(\/[^\/]+)/).reject(&:empty?).each { |dir|
    path = path + dir
    @zookeeper.create(:path => "#{path}")
  }
  $log.trace "use zk offset node : #{path}"
end

#next_offsetObject



375
376
377
# File 'lib/fluent/plugin/in_kafka.rb', line 375

def next_offset
  @zookeeper.get(:path => @zk_path)[:data].to_i
end

#save_offset(offset) ⇒ Object



379
380
381
382
# File 'lib/fluent/plugin/in_kafka.rb', line 379

def save_offset(offset)
  @zookeeper.set(:path => @zk_path, :data => offset.to_s)
  $log.trace "update zk offset node : #{offset.to_s}"
end