Class: Fluent::KafkaInput::OffsetManager

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_kafka.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic_entry, zookeeper, zk_root_node) ⇒ OffsetManager

Returns a new instance of OffsetManager.



364
365
366
367
368
# File 'lib/fluent/plugin/in_kafka.rb', line 364

def initialize(topic_entry, zookeeper, zk_root_node)
  @zookeeper = zookeeper
  @zk_path = "#{zk_root_node}/#{topic_entry.topic}/#{topic_entry.partition}/next_offset"
  create_node(@zk_path, topic_entry.topic, topic_entry.partition)
end

Instance Method Details

#create_node(zk_path, topic, partition) ⇒ Object



370
371
372
373
374
375
376
377
# File 'lib/fluent/plugin/in_kafka.rb', line 370

def create_node(zk_path, topic, partition)
  path = ""
  zk_path.split(/(\/[^\/]+)/).reject(&:empty?).each { |dir|
    path = path + dir
    @zookeeper.create(:path => "#{path}")
  }
  $log.trace "use zk offset node : #{path}"
end

#next_offsetObject



379
380
381
# File 'lib/fluent/plugin/in_kafka.rb', line 379

def next_offset
  @zookeeper.get(:path => @zk_path)[:data].to_i
end

#save_offset(offset) ⇒ Object



383
384
385
386
# File 'lib/fluent/plugin/in_kafka.rb', line 383

def save_offset(offset)
  @zookeeper.set(:path => @zk_path, :data => offset.to_s)
  $log.trace "update zk offset node : #{offset.to_s}"
end