Class: Fluent::WebHDFSOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Includes:
Mixin::ConfigPlaceholders, Mixin::PlainTextFormatter
Defined in:
lib/fluent/plugin/out_webhdfs.rb

Instance Method Summary collapse

Constructor Details

#initializeWebHDFSOutput

Returns a new instance of WebHDFSOutput.



32
33
34
35
36
37
# File 'lib/fluent/plugin/out_webhdfs.rb', line 32

def initialize
  super
  require 'net/http'
  require 'time'
  require 'webhdfs'
end

Instance Method Details

#configure(conf) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/out_webhdfs.rb', line 39

def configure(conf)
  if conf['path']
    if conf['path'].index('%S')
      conf['time_slice_format'] = '%Y%m%d%H%M%S'
    elsif conf['path'].index('%M')
      conf['time_slice_format'] = '%Y%m%d%H%M'
    elsif conf['path'].index('%H')
      conf['time_slice_format'] = '%Y%m%d%H'
    end
  end

  super

  if @host
    @namenode_host = @host
    @namenode_port = @port
  elsif @namenode
    unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @namenode
      raise Fluent::ConfigError, "Invalid config value about namenode: '#{@namenode}', needs NAMENODE_NAME:PORT"
    end
    @namenode_host = $1
    @namenode_port = $2.to_i
  else
    raise Fluent::ConfigError, "WebHDFS host or namenode missing"
  end
  unless @path.index('/') == 0
    raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'"
  end
  
  @client = WebHDFS::Client.new(@namenode_host, @namenode_port, @username)
  if @httpfs
    @client.httpfs_mode = true
  end
  @client.open_timeout = @open_timeout
  @client.read_timeout = @read_timeout
end

#path_format(chunk_key) ⇒ Object



92
93
94
# File 'lib/fluent/plugin/out_webhdfs.rb', line 92

def path_format(chunk_key)
  Time.strptime(chunk_key, @time_slice_format).strftime(@path)
end

#send_data(path, data) ⇒ Object

TODO check conflictions



98
99
100
101
102
103
104
# File 'lib/fluent/plugin/out_webhdfs.rb', line 98

def send_data(path, data)
  begin
    @client.append(path, data)
  rescue WebHDFS::FileNotFoundError
    @client.create(path, data)
  end
end

#shutdownObject



88
89
90
# File 'lib/fluent/plugin/out_webhdfs.rb', line 88

def shutdown
  super
end

#startObject



76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/out_webhdfs.rb', line 76

def start
  super

  begin
    ary = @client.list('/')
    $log.info "webhdfs connection confirmed: #{@namenode_host}:#{@namenode_port}"
  rescue
    $log.error "webdhfs check request failed!"
    raise unless @ignore_start_check_error
  end
end

#write(chunk) ⇒ Object



106
107
108
109
110
111
112
113
114
115
# File 'lib/fluent/plugin/out_webhdfs.rb', line 106

def write(chunk)
  hdfs_path = path_format(chunk.key)
  begin
    send_data(hdfs_path, chunk.read)
  rescue
    $log.error "failed to communicate hdfs cluster, path: #{hdfs_path}"
    raise
  end
  hdfs_path
end