Class: Fluent::KafkaInput::OffsetManager

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_kafka.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic_entry, zookeeper, zk_root_node) ⇒ OffsetManager

Returns a new instance of OffsetManager.



337
338
339
340
341
# File 'lib/fluent/plugin/in_kafka.rb', line 337

def initialize(topic_entry, zookeeper, zk_root_node)
  @zookeeper = zookeeper
  @zk_path = "#{zk_root_node}/#{topic_entry.topic}/#{topic_entry.partition}/next_offset"
  create_node(@zk_path, topic_entry.topic, topic_entry.partition)
end

Instance Method Details

#create_node(zk_path, topic, partition) ⇒ Object



343
344
345
346
347
348
349
350
# File 'lib/fluent/plugin/in_kafka.rb', line 343

def create_node(zk_path, topic, partition)
  path = ""
  zk_path.split(/(\/[^\/]+)/).reject(&:empty?).each { |dir|
    path = path + dir
    @zookeeper.create(:path => "#{path}")
  }
  $log.trace "use zk offset node : #{path}"
end

#next_offsetObject



352
353
354
# File 'lib/fluent/plugin/in_kafka.rb', line 352

def next_offset
  @zookeeper.get(:path => @zk_path)[:data].to_i
end

#save_offset(offset) ⇒ Object



356
357
358
359
# File 'lib/fluent/plugin/in_kafka.rb', line 356

def save_offset(offset)
  @zookeeper.set(:path => @zk_path, :data => offset.to_s)
  $log.trace "update zk offset node : #{offset.to_s}"
end