S3 Parquet output plugin for Embulk
Embulk output plugin to dump records as Apache Parquet files on S3.
Overview
- Plugin type: output
- Load all or nothing: no
- Resume supported: no
- Cleanup supported: yes
Configuration
- bucket: s3 bucket name (string, required)
- path_prefix: prefix of target keys (string, optional)
- sequence_format: format of the sequence number of the output files (string, default:
"%03d.%02d."
)- sequence_format formats task index and sequence number in a task.
- file_ext: path suffix of the output files (string, default:
"parquet"
) - compression_codec: compression codec for parquet file (
"uncompressed"
,"snappy"
,"gzip"
,"lzo"
,"brotli"
,"lz4"
or"zstd"
, default:"uncompressed"
) - default_timestamp_format: default timestamp format (string, default:
"%Y-%m-%d %H:%M:%S.%6N %z"
) - default_timezone: default timezone (string, default:
"UTC"
) - column_options: a map whose keys are name of columns, and values are configuration with following parameters (optional)
- timezone: timezone if type of this column is timestamp. If not set, default_timezone is used. (string, optional)
- format: timestamp format if type of this column is timestamp. If not set, default_timestamp_format: is used. (string, optional)
- logical_type: a Parquet logical type name (
timestamp-millis
,timestamp-micros
,json
,int8
,int16
,int32
,int64
,uint8
,uint16
,uint32
,uint64
) (string, optional)
- canned_acl: grants one of canned ACLs for created objects (string, default:
private
) - block_size: The block size is the size of a row group being buffered in memory. This limits the memory usage when writing. Larger values will improve the I/O when reading but consume more memory when writing. (int, default:
134217728
(128MB)) - page_size: The page size is for compression. When reading, each page can be decompressed independently. A block is composed of pages. The page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. (int, default:
1048576
(1MB)) - max_padding_size: The max size (bytes) to write as padding and the min size of a row group (int, default:
8388608
(8MB)) - enable_dictionary_encoding: The boolean value is to enable/disable dictionary encoding. (boolean, default:
true
) auth_method: name of mechanism to authenticate requests (
"basic"
,"env"
,"instance"
,"profile"
,"properties"
,"anonymous"
,"session"
,"web_identity_token"
, default:"default"
)"basic"
: uses access_key_id and secret_access_key to authenticate."env"
: usesAWS_ACCESS_KEY_ID
(orAWS_ACCESS_KEY
) andAWS_SECRET_KEY
(orAWS_SECRET_ACCESS_KEY
) environment variables."instance"
: uses EC2 instance profile or attached ECS task role."profile"
: uses credentials written in a file. Format of the file is as following, where[...]
is a name of profile. ``` [default] aws_access_key_id=YOUR_ACCESS_KEY_ID aws_secret_access_key=YOUR_SECRET_ACCESS_KEY
[profile2] ...
- `"properties"`: uses aws.accessKeyId and aws.secretKey Java system properties. - `"anonymous"`: uses anonymous access. This auth method can access only public files. - `"session"`: uses temporary-generated **access_key_id**, **secret_access_key** and **session_token**. - `"assume_role"`: uses temporary-generated credentials by assuming **role_arn** role. - `"web_identity_token"`: uses temporary-generated credentials by assuming **role_arn** role with web identity. - `"default"`: uses AWS SDK's default strategy to look up available credentials from runtime environment. This method behaves like the combination of the following methods. 1. `"env"` 1. `"properties"` 1. `"profile"` 1. `"instance"`
profile_file: path to a profiles file. this is optionally used when auth_method is
"profile"
. (string, default: given byAWS_CREDENTIAL_PROFILES_FILE
environment variable, or ~/.aws/credentials).profile_name: name of a profile. this is optionally used when auth_method is
"profile"
. (string, default:"default"
)access_key_id: aws access key id. this is required when auth_method is
"basic"
or"session"
. (string, optional)secret_access_key: aws secret access key. this is required when auth_method is
"basic"
or"session"
. (string, optional)session_token: aws session token. this is required when auth_method is
"session"
. (string, optional)role_arn: arn of the role to assume. this is required for auth_method is
"assume_role"
or"web_identity_token"
. (string, optional)role_session_name: an identifier for the assumed role session. this is required when auth_method is
"assume_role"
or"web_identity_token"
. (string, optional)role_external_id: a unique identifier that is used by third parties when assuming roles in their customers' accounts. this is optionally used for auth_method:
"assume_role"
. (string, optional)role_session_duration_seconds: duration, in seconds, of the role session. this is optionally used for auth_method:
"assume_role"
. (int, optional)web_identity_token_file: the absolute path to the web identity token file. this is required when auth_method is
"web_identity_token"
. (string, optional)scope_down_policy: an iam policy in json format. this is optionally used for auth_method:
"assume_role"
. (string, optional)catalog: Register a table if this option is specified (optional)
- catalog_id: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)
- database: The name of the database (string, required)
- table: The name of the table (string, required)
- column_options: a key-value pairs where key is a column name and value is options for the column. (string to options map, default:
{}
) - type: type of column when this plugin creates new tables (e.g.
string
,bigint
) (string, default: depends on the input embulk column type, or the parquet logical type. See the below table)
|embulk column type|glue data type| |:---|:---| |long|bigint| |boolean|boolean| |double|double| |string|string| |timestamp|string| |json|string|
|parquet logical type|glue data type|note| |:---|:---|:---| |timestamp-millis|timestamp|| |timestamp-micros|long|Glue cannot recognize timestamp-micros.| |int8|tinyint|| |int16|smallint|| |int32|int|| |int64|bigint|| |uint8|smallint|Glue tinyint is a minimum value of -2^7 and a maximum value of 2^7-1| |uint16|int|Glue smallint is a minimum value of -2^15 and a maximum value of 2^15-1.| |uint32|bigint|Glue int is a minimum value of-2^31 and a maximum value of 2^31-1.| |uint64|ConfigException|Glue bigint supports only a 64-bit signed integer.| |json|string||
- operation_if_exists: operation if the table already exist. Available operations are
"delete"
and"skip"
(string, default:"delete"
)
endpoint: The AWS Service endpoint (string, optional)
region: The AWS region (string, optional)
http_proxy: Indicate whether using when accessing AWS via http proxy. (optional)
- host proxy host (string, required)
- port proxy port (int, optional)
- protocol proxy protocol (string, default:
"https"
) - user proxy user (string, optional)
- password proxy password (string, optional)
buffer_dir: buffer directory for parquet files to be uploaded on S3 (string, default: Create a Temporary Directory)
type_options: a map whose keys are name of embulk type(
boolean
,long
,double
,string
,timestamp
,json
), and values are configuration with following parameters (optional)- logical_type: a Parquet logical type name (
timestamp-millis
,timestamp-micros
,json
,int8
,int16
,int32
,int64
,uint8
,uint16
,uint32
,uint64
) (string, optional)
- logical_type: a Parquet logical type name (
Example
out:
type: s3_parquet
bucket: my-bucket
path_prefix: path/to/my-obj.
file_ext: snappy.parquet
compression_codec: snappy
default_timezone: Asia/Tokyo
canned_acl: bucket-owner-full-control
Note
- The current Parquet LogicalTypes implementation does only old representation.
- Some kind of LogicalTypes are sometimes not supported on your middleware. Be careful to giving logical type name.
Development
Run example:
$ ./run_s3_local.sh
$ ./example/prepare_s3_bucket.sh
$ ./gradlew classpath
$ embulk run example/config.yml -Ilib
Run test:
$ ./run_s3_local.sh
$ ./gradlew test
Build
$ ./gradlew gem # -t to watch change of files and rebuild continuously
Release gem:
Fix build.gradle, then
$ ./gradlew gemPush
ChangeLog
Contributors
- @syucream