Class: LogStash::Outputs::HDFS
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::HDFS
- Defined in:
- lib/logstash/outputs/hdfs.rb
Overview
HDFS output.
Write events to files to HDFS. You can use fields from the event as parts of the filename.
Defined Under Namespace
Classes: DFSOutputStreamWrapper
Instance Method Summary collapse
- #receive(event) ⇒ Object
- #register ⇒ Object
-
#teardown ⇒ Object
def receive.
Instance Method Details
#receive(event) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/logstash/outputs/hdfs.rb', line 64 def receive(event) return unless output?(event) out = get_output_stream(event.sprintf(@path)) if @message_format output = event.sprintf(@message_format) else output = event.to_json end output += "\n" unless output.end_with? "\n" out.write(output) flush(out) close_stale_files end |
#register ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/logstash/outputs/hdfs.rb', line 39 def register require "java" java_import "org.apache.hadoop.fs.Path" java_import "org.apache.hadoop.fs.FileSystem" java_import "org.apache.hadoop.conf.Configuration" @files = {} now = Time.now @last_flush_cycle = now @last_stale_cleanup_cycle = now flush_interval = @flush_interval.to_i @stale_cleanup_interval = 10 conf = Configuration.new if @hadoop_config_resources @hadoop_config_resources.each { |resource| conf.addResource(resource) } end @logger.info "Using Hadoop configuration: #{conf.get("fs.defaultFS")}" @hdfs = FileSystem.get(conf) end |
#teardown ⇒ Object
def receive
81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/logstash/outputs/hdfs.rb', line 81 def teardown @logger.debug("Teardown: closing files") @files.each do |path, fd| begin fd.close @logger.debug("Closed file #{path}", :fd => fd) rescue Exception => e @logger.error("Excpetion while flushing and closing files.", :exception => e) end end finished end |