Class: LogStash::Outputs::HDFS

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/hdfs.rb

Overview

HDFS output.

Write events to files to HDFS. You can use fields from the event as parts of the filename.

Defined Under Namespace

Classes: DFSOutputStreamWrapper

Instance Method Summary collapse

Instance Method Details

#receive(event) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/logstash/outputs/hdfs.rb', line 64

def receive(event)
  return unless output?(event)
  out = get_output_stream(event.sprintf(@path))

  if @message_format
    output = event.sprintf(@message_format)
  else
    output = event.to_json
  end
  output += "\n" unless output.end_with? "\n"

  out.write(output)

  flush(out)
  close_stale_files
end

#registerObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/logstash/outputs/hdfs.rb', line 39

def register
  require "java"
  java_import "org.apache.hadoop.fs.Path"
  java_import "org.apache.hadoop.fs.FileSystem"
  java_import "org.apache.hadoop.conf.Configuration"

  @files = {}
  now = Time.now
  @last_flush_cycle = now
  @last_stale_cleanup_cycle = now
  flush_interval = @flush_interval.to_i
  @stale_cleanup_interval = 10
  conf = Configuration.new

  if @hadoop_config_resources
    @hadoop_config_resources.each { |resource|
      conf.addResource(resource)
    }
  end

  @logger.info "Using Hadoop configuration: #{conf.get("fs.defaultFS")}"
  @hdfs = FileSystem.get(conf)
end

#teardownObject

def receive



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/logstash/outputs/hdfs.rb', line 81

def teardown
  @logger.debug("Teardown: closing files")
  @files.each do |path, fd|
    begin
      fd.close
      @logger.debug("Closed file #{path}", :fd => fd)
    rescue Exception => e
      @logger.error("Excpetion while flushing and closing files.", :exception => e)
    end
  end
  finished
end