Logstash Cassandra Output Plugin

This is a plugin for Logstash.

It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way.

It was originally a fork of the logstash-output-cassandra plugin by Oleg Tokarev, which has gone unmaintained and went through a major re-design in this version we built.

Usage


output {
    cassandra {
        # List of Cassandra hostname(s) or IP-address(es)
        hosts => [ "cass-01", "cass-02" ]

        # The port cassandra is listening to
        port => 9042

        # The protocol version to use with cassandra
        protocol_version => 4

        # Cassandra consistency level.
        # Options: "any", "one", "two", "three", "quorum", "all", "local_quorum", "each_quorum", "serial", "local_serial", "local_one"
        # Default: "one"
        consistency => 'any'

        # The keyspace to use
        keyspace => "a_ks"

        # The table to use (event level processing (e.g. %{[key]}) is supported)
        table => "%{[@metadata][cassandra_table]}"

        # Username
        username => "cassandra"

        # Password
        password => "cassandra"

        # An optional hints hash which will be used in case filter_transform or filter_transform_event_key are not in use
        # It is used to trigger a forced type casting to the cassandra driver types in
        # the form of a hash from column name to type name in the following manner:
        hints => {
            id => "int"
            at => "timestamp"
            resellerId => "int"
            errno => "int"
            duration => "float"
            ip => "inet"
        }

        # The retry policy to use (the default is the default retry policy)
        # the hash requires the name of the policy and the params it requires
        # The available policy names are:
        # * default => retry once if needed / possible
        # * downgrading_consistency => retry once with a best guess lowered consistency
        # * failthrough => fail immediately (i.e. no retries)
        # * backoff => a version of the default retry policy but with configurable backoff retries
        # The backoff options are as follows:
        # * backoff_type => either * or ** for linear and exponential backoffs respectively
        # * backoff_size => the left operand for the backoff type in seconds
        # * retry_limit => the maximum amount of retries to allow per query
        # example:
        # using { "type" => "backoff" "backoff_type" => "**" "backoff_size" => 2 "retry_limit" => 10 } will perform 10 retries with the following wait times: 1, 2, 4, 8, 16, ... 1024
        # NOTE: there is an underlying assumption that the insert query is idempotent !!!
        # NOTE: when the backoff retry policy is used, it will also be used to handle pure client timeouts and not just ones coming from the coordinator
        retry_policy => { "type" => "default" }

        # The command execution timeout
        request_timeout => 1

        # Ignore bad values
        ignore_bad_values => false

        # In Logstashes >= 2.2 this setting defines the maximum sized bulk request Logstash will make
        # You you may want to increase this to be in line with your pipeline's batch size.
        # If you specify a number larger than the batch size of your pipeline it will have no effect,
        # save for the case where a filter increases the size of an inflight batch by outputting
        # events.
        #
        # In Logstashes <= 2.1 this plugin uses its own internal buffer of events.
        # This config option sets that size. In these older logstashes this size may
        # have a significant impact on heap usage, whereas in 2.2+ it will never increase it.
        # To make efficient bulk API calls, we will buffer a certain number of
        # events before flushing that out to Cassandra. This setting
        # controls how many events will be buffered before sending a batch
        # of events. Increasing the `flush_size` has an effect on Logstash's heap size.
        # Remember to also increase the heap size using `LS_HEAP_SIZE` if you are sending big commands
        # or have increased the `flush_size` to a higher value.
        flush_size => 500

        # The amount of time since last flush before a flush is forced.
        #
        # This setting helps ensure slow event rates don't get stuck in Logstash.
        # For example, if your `flush_size` is 100, and you have received 10 events,
        # and it has been more than `idle_flush_time` seconds since the last flush,
        # Logstash will flush those 10 events automatically.
        #
        # This helps keep both fast and slow log streams moving along in
        # near-real-time.
        idle_flush_time => 1
    }
}

Running Plugin in Logstash

Run in a local Logstash clone

Edit Logstash Gemfile and add the local plugin path, for example:

gem "logstash-output-cassandra", :path => "/your/local/logstash-output-cassandra"

And install by executing:

bin/plugin install --no-verify

Or install plugin from RubyGems:

bin/plugin install logstash-output-cassandra

And then run Logstash with the plugin:

bin/logstash -e 'output {cassandra {}}'

Run in an installed Logstash

You can use the same method to run your plugin in an installed Logstash by editing its Gemfile and pointing the :path to your local plugin development directory or you can build the gem and install it using:

Build your plugin gem

gem build logstash-output-cassandra.gemspec

Install the plugin from the Logstash home

bin/plugin install /your/local/plugin/logstash-output-cassandra.gem

Run Logstash with the plugin

bin/logstash -e 'output {cassandra {}}'

TODO

  • Finish integration specs
    • it "properly works with counter columns"
    • it "properly adds multiple events to multiple tables in the same bulk"
  • Improve retries to include (but probably only handle Errors::Timeout and Errors::NoHostsAvailable):
    • #get_query
    • #execute_async
  • Upgrade / test with logstash 2.3
  • Upgrade / test with cassandra 3