Hdfs file input plugin for Embulk
Read files on Hdfs.
Overview
- Plugin type: file input
- Resume supported: not yet
- Cleanup supported: no
Configuration
- config_files list of paths to Hadoop's configuration files (array of strings, default:
[]) - config overwrites configuration parameters (hash, default:
{}) - path file path on Hdfs. you can use glob and Date format like
%Y%m%d/%s. - rewind_seconds When you use Date format in input_path property, the format is executed by using the time which is Now minus this property.
- partition when this is true, partition input files and increase task count. (default:
true) - num_partitions number of partitions. (default:
Runtime.getRuntime().availableProcessors())
Example
in:
type: hdfs
config_files:
- /opt/analytics/etc/hadoop/conf/core-site.xml
- /opt/analytics/etc/hadoop/conf/hdfs-site.xml
config:
fs.defaultFS: 'hdfs://hadoop-nn1:8020'
dfs.replication: 1
fs.hdfs.impl: 'org.apache.hadoop.hdfs.DistributedFileSystem'
fs.file.impl: 'org.apache.hadoop.fs.LocalFileSystem'
path: /user/embulk/test/%Y-%m-%d/*
rewind_seconds: 86400
partition: true
num_partitions: 30
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: "\t"
quote: ''
escape: ''
trim_if_not_quoted: true
skip_header_lines: 0
allow_extra_columns: true
allow_optional_columns: true
columns:
- {name: c0, type: string}
- {name: c1, type: string}
- {name: c2, type: string}
- {name: c3, type: long}
Note
- The parameter num_partitions is the approximate value. The actual num_partitions is larger than this parameter.
- the feature of the partition supports only 3 line terminators.
\n\r\r\n
The Reference Implementation
The Partitioning Logic
int partitionSizeByOneTask = totalFileLength / approximateNumPartitions;
/*
...
*/
int numPartitions;
if (path.toString().endsWith(".gz") || path.toString().endsWith(".bz2") || path.toString().endsWith(".lzo")) {
// if the file is compressed, skip partitioning.
numPartitions = 1;
}
else if (!task.getPartition()) {
// if no partition mode, skip partitioning.
numPartitions = 1;
}
else {
// equalize the file size per task as much as possible.
numPartitions = ((fileLength - 1) / partitionSizeByOneTask) + 1;
}
/*
...
*/
Build
$ ./gradlew gem
Development
$ ./gradlew classpath
$ bundle exec embulk run -I lib example.yml