Class: LogStash::Inputs::Kinesis

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/kinesis.rb

Overview

Receive events through an AWS Kinesis stream.

This input plugin uses the Java Kinesis Client Library underneath, so the documentation at github.com/awslabs/amazon-kinesis-client will be useful.

AWS credentials can be specified either through environment variables, or an IAM instance role. The library uses a DynamoDB table for worker coordination, so you’ll need to grant access to that as well as to the Kinesis stream. The DynamoDB table has the same name as the ‘application_name` configuration option, which defaults to “logstash”.

The library can optionally also send worker statistics to CloudWatch.

Constant Summary collapse

KCL =
com.amazonaws.services.kinesis.clientlibrary.lib.worker
KCL_PROCESSOR_FACTORY_CLASS =
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Kinesis

Returns a new instance of Kinesis.



52
53
54
# File 'lib/logstash/inputs/kinesis.rb', line 52

def initialize(params = {})
  super(params)
end

Instance Attribute Details

#kcl_configObject (readonly)

Returns the value of attribute kcl_config.



30
31
32
# File 'lib/logstash/inputs/kinesis.rb', line 30

def kcl_config
  @kcl_config
end

Instance Method Details

#kcl_builder(output_queue) ⇒ Object



77
78
79
80
81
82
83
84
85
86
# File 'lib/logstash/inputs/kinesis.rb', line 77

def kcl_builder(output_queue)
  KCL::Worker::Builder.new.tap do |builder|
    builder.java_send(:recordProcessorFactory, [KCL_PROCESSOR_FACTORY_CLASS.java_class], worker_factory(output_queue))
    builder.config(@kcl_config)

    if metrics_factory
      builder.metricsFactory(metrics_factory)
    end
  end
end

#registerObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/inputs/kinesis.rb', line 56

def register
  # the INFO log level is extremely noisy in KCL
  org.apache.commons.logging::LogFactory.getLog("com.amazonaws.services.kinesis").
    logger.setLevel(java.util.logging::Level::WARNING)

  worker_id = java.util::UUID.randomUUID.to_s
  creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new
  @kcl_config = KCL::KinesisClientLibConfiguration.new(
    @application_name,
    @kinesis_stream_name,
    creds,
    worker_id).
      withInitialPositionInStream(KCL::InitialPositionInStream::TRIM_HORIZON).
      withRegionName(@region)
end

#run(output_queue) ⇒ Object



72
73
74
75
# File 'lib/logstash/inputs/kinesis.rb', line 72

def run(output_queue)
  @kcl_worker = kcl_builder(output_queue).build
  @kcl_worker.run
end

#teardownObject



88
89
90
# File 'lib/logstash/inputs/kinesis.rb', line 88

def teardown
  @kcl_worker.shutdown if @kcl_worker
end

#worker_factory(output_queue) ⇒ Object



92
93
94
# File 'lib/logstash/inputs/kinesis.rb', line 92

def worker_factory(output_queue)
  proc { Worker.new(@codec.clone, output_queue, method(:decorate), @checkpoint_interval_seconds, @logger) }
end