Class: LogStash::Inputs::Kinesis
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Kinesis
- Defined in:
- lib/logstash/inputs/kinesis.rb
Overview
Receive events through an AWS Kinesis stream.
This input plugin uses the Java Kinesis Client Library underneath, so the documentation at github.com/awslabs/amazon-kinesis-client will be useful.
AWS credentials can be specified either through environment variables, or an IAM instance role. The library uses a DynamoDB table for worker coordination, so you’ll need to grant access to that as well as to the Kinesis stream. The DynamoDB table has the same name as the ‘application_name` configuration option, which defaults to “logstash”.
The library can optionally also send worker statistics to CloudWatch.
Constant Summary collapse
- KCL =
com.amazonaws.services.kinesis.clientlibrary.lib.worker
- KCL_PROCESSOR_FACTORY_CLASS =
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
Instance Attribute Summary collapse
-
#kcl_config ⇒ Object
readonly
Returns the value of attribute kcl_config.
Instance Method Summary collapse
-
#initialize(params = {}) ⇒ Kinesis
constructor
A new instance of Kinesis.
- #kcl_builder(output_queue) ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #teardown ⇒ Object
- #worker_factory(output_queue) ⇒ Object
Constructor Details
#initialize(params = {}) ⇒ Kinesis
Returns a new instance of Kinesis.
52 53 54 |
# File 'lib/logstash/inputs/kinesis.rb', line 52 def initialize(params = {}) super(params) end |
Instance Attribute Details
#kcl_config ⇒ Object (readonly)
Returns the value of attribute kcl_config.
30 31 32 |
# File 'lib/logstash/inputs/kinesis.rb', line 30 def kcl_config @kcl_config end |
Instance Method Details
#kcl_builder(output_queue) ⇒ Object
77 78 79 80 81 82 83 84 85 86 |
# File 'lib/logstash/inputs/kinesis.rb', line 77 def kcl_builder(output_queue) KCL::Worker::Builder.new.tap do |builder| builder.java_send(:recordProcessorFactory, [KCL_PROCESSOR_FACTORY_CLASS.java_class], worker_factory(output_queue)) builder.config(@kcl_config) if metrics_factory builder.metricsFactory(metrics_factory) end end end |
#register ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/logstash/inputs/kinesis.rb', line 56 def register # the INFO log level is extremely noisy in KCL org.apache.commons.logging::LogFactory.getLog("com.amazonaws.services.kinesis"). logger.setLevel(java.util.logging::Level::WARNING) worker_id = java.util::UUID.randomUUID.to_s creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new @kcl_config = KCL::KinesisClientLibConfiguration.new( @application_name, @kinesis_stream_name, creds, worker_id). withInitialPositionInStream(KCL::InitialPositionInStream::TRIM_HORIZON). withRegionName(@region) end |
#run(output_queue) ⇒ Object
72 73 74 75 |
# File 'lib/logstash/inputs/kinesis.rb', line 72 def run(output_queue) @kcl_worker = kcl_builder(output_queue).build @kcl_worker.run end |
#teardown ⇒ Object
88 89 90 |
# File 'lib/logstash/inputs/kinesis.rb', line 88 def teardown @kcl_worker.shutdown if @kcl_worker end |
#worker_factory(output_queue) ⇒ Object
92 93 94 |
# File 'lib/logstash/inputs/kinesis.rb', line 92 def worker_factory(output_queue) proc { Worker.new(@codec.clone, output_queue, method(:decorate), @checkpoint_interval_seconds, @logger) } end |