MongoDB input plugin for Embulk

Build Status

MongoDB input plugin for Embulk loads records from MongoDB. This plugin loads documents as single-column records (column name is "record"). You can use filter plugins such as embulk-filter-expand_json or embulk-filter-add_time to convert the json column to typed columns. Rename filter is also useful to rename the typed columns.

Overview

This plugin only works with embulk >= 0.8.8.

  • Plugin type: input
  • Guess supported: no

Configuration

  • Connection parameters One of them is required.

    • use MongoDB connection string URI
    • uri: MongoDB connection string URI (e.g. 'mongodb://localhost:27017/mydb') (string, required)
    • use separated URI parameters
    • hosts: list of hosts. hosts are pairs of host(string, required) and port(integer, optional, default: 27017)
    • user: (string, optional)
    • password: (string, optional)
    • database: (string, required)
  • collection: source collection name (string, required)

  • fields: (deprecated) ~~hash records that has the following two fields (array, required)~~ ~~- name: Name of the column~~ ~~- type: Column types as follows~~ ~~- boolean~~ ~~- long~~ ~~- double~~ ~~- string~~ ~~- timestamp~~

  • id_field_name Name of Object ID field name. Set if you want to change the default name _id (string, optional, default: "_id")

  • query: A JSON document used for querying on the source collection. Documents are loaded from the colleciton if they match with this condition. (string, optional)

  • projection: A JSON document used for projection on query results. Fields in a document are used only if they match with this condition. (string, optional)

  • sort: Ordering of results (string, optional)

  • batch_size: Limits the number of objects returned in one batch (integer, optional, default: 10000)

  • incremental_field List of field name (list, optional, can't use with sort option)

  • last_record Last loaded record for incremental load (hash, optional)

  • stop_on_invalid_record Stop bulk load transaction if a document includes invalid record (such as unsupported object type) (boolean, optional, default: false)

  • json_column_name: column name used in outputs (string, optional, default: "record")

Example

Exporting all objects

Specify with MongoDB connection string URI.

in:
  type: mongodb
  uri: mongodb://myuser:mypassword@localhost:27017/my_database
  collection: "my_collection"

Specify with separated URI parameters.

in:
  type: mongodb
  hosts:
  - {host: localhost, port: 27017}
  - {host: example.com, port: 27017}
  user: myuser
  password: mypassword
  database: my_database
  collection: "my_collection"

Filtering documents by query and projection

in:
  type: mongodb
  uri: mongodb://myuser:mypassword@localhost:27017/my_database
  collection: "my_collection"
  query: '{ field1: { $gte: 3 } }'
  projection: '{ "_id": 1, "field1": 1, "field2": 0 }'
  sort: '{ "field1": 1 }'

Incremental loading

in:
  type: mongodb
  uri: mongodb://myuser:mypassword@localhost:27017/my_database
  collection: "my_collection"
  query: '{ field1: { $gt: 3 } }'
  projection: '{ "_id": 1, "field1": 1, "field2": 1 }'
  incremental_field:
    - "field2"
  last_record: {"field2": 13215}

Plugin will create new query and sort value. You can't use incremental_field option with sort option at the same time.

query { field1: { $gt: 3 }, field2: { $gt: 13215}}
sort {"field2", 1} # field2 ascending

You have to specify last_record with special characters when field type is ObjectId or DateTime.

# ObjectId field
in:
  type: mongodb
  incremental_field:
    - "_id"
  last_record: {"_id": {"$oid": "5739b2261c21e58edfe39716"}}

# DateTime field
in:
  type: mongodb
  incremental_field:
    - "time_field"
  last_record: {"time_field": {"$date": "2015-01-25T13:23:15.000Z"}}

Run Incremental load

$ embulk run /path/to/config.yml -c config-diff.yml

Advanced usage with filter plugins

in:
  type: mongodb
  uri: mongodb://myuser:mypassword@localhost:27017/my_database
  collection: "my_collection"
  query: '{ "age": { $gte: 3 } }'
  projection: '{ "_id": 1, "age": 1, "ts": 1, "firstName": 1, "lastName": 1 }'

filters:
  # convert json column into typed columns
  - type: expand_json
    json_column_name: record
    expanded_columns:
      - {name: _id, type: long}
      - {name: ts, type: string}
      - {name: firstName, type: string}
      - {name: lastName, type: string}

  # rename column names
  - type: rename
    columns:
      _id: id
      firstName: first_name
      lastName: last_name

  # convert string "ts" column into timestamp "time" column
  - type: add_time
    from_column:
      name: ts
      timestamp_format: "%Y-%m-%dT%H:%M:%S.%N%z"
    to_column:
      name: time
      type: timestamp

Build

$ ./gradlew gem

Test

$ ./gradlew test  # -t to watch change of files and rebuild continuously

To run unit tests, we need to configure the following environment variables.

When environment variables are not set, skip almost test cases.

MONGO_URI
MONGO_COLLECTION

If you're using Mac OS X El Capitan and GUI Applications(IDE), like as follows.

$ vi ~/Library/LaunchAgents/environment.plist
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
  <key>Label</key>
  <string>my.startup</string>
  <key>ProgramArguments</key>
  <array>
    <string>sh</string>
    <string>-c</string>
    <string>
      launchctl setenv MONGO_URI mongodb://myuser:mypassword@localhost:27017/my_database
      launchctl setenv MONGO_COLLECTION my_collection
    </string>
  </array>
  <key>RunAtLoad</key>
  <true/>
</dict>
</plist>

$ launchctl load ~/Library/LaunchAgents/environment.plist
$ launchctl getenv MONGO_URI //try to get value.

Then start your applications.