MongoDB input plugin for Embulk
MongoDB input plugin for Embulk loads records from MongoDB. This plugin loads documents as single-column records (column name is "record"). You can use filter plugins such as embulk-filter-expand_json or embulk-filter-add_time to convert the json column to typed columns. Rename filter is also useful to rename the typed columns.
Overview
This plugin only works with embulk >= 0.8.8.
- Plugin type: input
- Guess supported: no
Configuration
Connection parameters One of them is required.
- use MongoDB connection string URI
- uri: MongoDB connection string URI (e.g. 'mongodb://localhost:27017/mydb') (string, required)
- use separated URI parameters
- hosts: list of hosts.
hosts
are pairs of host(string, required) and port(integer, optional, default: 27017) - user: (string, optional)
- password: (string, optional)
- database: (string, required)
collection: source collection name (string, required)
fields: (deprecated) ~~hash records that has the following two fields (array, required)~~ ~~- name: Name of the column~~ ~~- type: Column types as follows~~ ~~- boolean~~ ~~- long~~ ~~- double~~ ~~- string~~ ~~- timestamp~~
id_field_name Name of Object ID field name. Set if you want to change the default name
_id
(string, optional, default: "_id")query: A JSON document used for querying on the source collection. Documents are loaded from the colleciton if they match with this condition. (string, optional)
projection: A JSON document used for projection on query results. Fields in a document are used only if they match with this condition. (string, optional)
sort: Ordering of results (string, optional)
batch_size: Limits the number of objects returned in one batch (integer, optional, default: 10000)
incremental_field List of field name (list, optional, can't use with sort option)
last_record Last loaded record for incremental load (hash, optional)
stop_on_invalid_record Stop bulk load transaction if a document includes invalid record (such as unsupported object type) (boolean, optional, default: false)
json_column_name: column name used in outputs (string, optional, default: "record")
Example
Exporting all objects
Specify with MongoDB connection string URI.
in:
type: mongodb
uri: mongodb://myuser:mypassword@localhost:27017/my_database
collection: "my_collection"
Specify with separated URI parameters.
in:
type: mongodb
hosts:
- {host: localhost, port: 27017}
- {host: example.com, port: 27017}
user: myuser
password: mypassword
database: my_database
collection: "my_collection"
Filtering documents by query and projection
in:
type: mongodb
uri: mongodb://myuser:mypassword@localhost:27017/my_database
collection: "my_collection"
query: '{ field1: { $gte: 3 } }'
projection: '{ "_id": 1, "field1": 1, "field2": 0 }'
sort: '{ "field1": 1 }'
Incremental loading
in:
type: mongodb
uri: mongodb://myuser:mypassword@localhost:27017/my_database
collection: "my_collection"
query: '{ field1: { $gt: 3 } }'
projection: '{ "_id": 1, "field1": 1, "field2": 1 }'
incremental_field:
- "field2"
last_record: {"field2": 13215}
Plugin will create new query and sort value.
You can't use incremental_field
option with sort
option at the same time.
query { field1: { $gt: 3 }, field2: { $gt: 13215}}
sort {"field2", 1} # field2 ascending
You have to specify last_record with special characters when field type is ObjectId
or DateTime
.
# ObjectId field
in:
type: mongodb
incremental_field:
- "_id"
last_record: {"_id": {"$oid": "5739b2261c21e58edfe39716"}}
# DateTime field
in:
type: mongodb
incremental_field:
- "time_field"
last_record: {"time_field": {"$date": "2015-01-25T13:23:15.000Z"}}
Run Incremental load
$ embulk run /path/to/config.yml -c config-diff.yml
Advanced usage with filter plugins
in:
type: mongodb
uri: mongodb://myuser:mypassword@localhost:27017/my_database
collection: "my_collection"
query: '{ "age": { $gte: 3 } }'
projection: '{ "_id": 1, "age": 1, "ts": 1, "firstName": 1, "lastName": 1 }'
filters:
# convert json column into typed columns
- type: expand_json
json_column_name: record
expanded_columns:
- {name: _id, type: long}
- {name: ts, type: string}
- {name: firstName, type: string}
- {name: lastName, type: string}
# rename column names
- type: rename
columns:
_id: id
firstName: first_name
lastName: last_name
# convert string "ts" column into timestamp "time" column
- type: add_time
from_column:
name: ts
timestamp_format: "%Y-%m-%dT%H:%M:%S.%N%z"
to_column:
name: time
type: timestamp
Build
$ ./gradlew gem
Test
$ ./gradlew test # -t to watch change of files and rebuild continuously
To run unit tests, we need to configure the following environment variables.
When environment variables are not set, skip almost test cases.
MONGO_URI
MONGO_COLLECTION
If you're using Mac OS X El Capitan and GUI Applications(IDE), like as follows.
$ vi ~/Library/LaunchAgents/environment.plist
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>my.startup</string>
<key>ProgramArguments</key>
<array>
<string>sh</string>
<string>-c</string>
<string>
launchctl setenv MONGO_URI mongodb://myuser:mypassword@localhost:27017/my_database
launchctl setenv MONGO_COLLECTION my_collection
</string>
</array>
<key>RunAtLoad</key>
<true/>
</dict>
</plist>
$ launchctl load ~/Library/LaunchAgents/environment.plist
$ launchctl getenv MONGO_URI //try to get value.
Then start your applications.