Class: Triglav::Agent::Hdfs::Connection

Inherits:
Base::Connection
  • Object
show all
Defined in:
lib/triglav/agent/hdfs/connection.rb

Constant Summary collapse

Path =
org.apache.hadoop.fs.Path
DistributedFileSystem =
org.apache.hadoop.hdfs.DistributedFileSystem
PathFilter =
org.apache.hadoop.fs.PathFilter
FileSystem =
org.apache.hadoop.fs.FileSystem

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_info) ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • connection_info (Hash)

Options Hash (connection_info):

  • :config_files (Array)

    config files for org.apache.hadoop.conf.Configuration

  • :config (Hash)

    config key value parameters for org.apache.hadoop.conf.Configuration

  • :doas (String)


20
21
22
23
24
# File 'lib/triglav/agent/hdfs/connection.rb', line 20

def initialize(connection_info)
  @connection_info = connection_info
  @configurations = {}
  @filesystems = {}
end

Instance Attribute Details

#connection_infoObject (readonly)

Returns the value of attribute connection_info.



14
15
16
# File 'lib/triglav/agent/hdfs/connection.rb', line 14

def connection_info
  @connection_info
end

Instance Method Details

#closeObject



26
27
28
# File 'lib/triglav/agent/hdfs/connection.rb', line 26

def close
  @filesystems.values.each {|fs| fs.close rescue nil }
end

#delete(path, recursive = false) ⇒ Boolean

for test

Parameters:

  • hdfs (String)

    path

Returns:

  • (Boolean)

    true for success



79
80
81
82
# File 'lib/triglav/agent/hdfs/connection.rb', line 79

def delete(path, recursive = false)
  fs = get_fs(namespace = URI.parse(path).host)
  fs.delete(Path.new(path), recursive)
end

#get_latest_file_under(paths) ⇒ org.apache.hadoop.fs.FileStatus

Get latest modification file under given path

Parameters:

  • hdfs (Array of String, or String)

    path glob patterns

Returns:

  • (org.apache.hadoop.fs.FileStatus)


35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/triglav/agent/hdfs/connection.rb', line 35

def get_latest_file_under(paths)
  entries = []
  Array(paths).each do |path|
    entries.concat(glob_files_recursively(path))
  end

  latest_entry = nil
  if entries.size > 0
    latest_entry = entries.first
    entries[1..entries.size].each do |entry|
      latest_entry = entry.modification_time > latest_entry.modification_time ? entry : latest_entry
    end
  end
  latest_entry
end

#mkdir(path) ⇒ Boolean

for test

Parameters:

  • hdfs (String)

    path

Returns:

  • (Boolean)

    true for success



55
56
57
58
# File 'lib/triglav/agent/hdfs/connection.rb', line 55

def mkdir(path)
  fs = get_fs(namespace = URI.parse(path).host)
  fs.mkdirs(Path.new(path))
end

#touch(path, overwrite = false) ⇒ Boolean

for test

Parameters:

  • hdfs (String)

    path

Returns:

  • (Boolean)

    true for success



64
65
66
67
68
69
70
71
72
73
# File 'lib/triglav/agent/hdfs/connection.rb', line 64

def touch(path, overwrite = false)
  fs = get_fs(namespace = URI.parse(path).host)
  Tempfile.create('triglav-agent-hdfs') do |fp|
    src = Path.new(fp.path)
    dst = Path.new(path) # hdfs://
    del_src = false
    overwrite = overwrite
    fs.copyFromLocalFile(del_src, overwrite, src, dst)
  end
end