Class: Telekinesis::Consumer::KCL

Inherits:
Object
  • Object
show all
Defined in:
lib/telekinesis/consumer/kcl.rb

Instance Method Summary collapse

Constructor Details

#initialize(config, &block) ⇒ KCL

Create a new consumer that consumes data from a Kinesis stream using the AWS Kinesis Client Library.

The KCL uses DynamoDB to register clients as part of the an application and evenly distribute work between all of the clients registered for the same application. See the AWS Docs for more information:

docs.aws.amazon.com/kinesis/latest/dev/developing-consumer-apps-with-kcl.html

KCLs are configured with a hash. The Kinesis ‘:stream` to consume from is required.

KCL clients operate in groups. All consumers with the same ‘:app` id use DynamoDB to attempt to distribute work evenly among themselves. The `:worker_id` is used to distinguish individual clients (`:worker_id` defaults to the current hostname. If you plan to run more than one KCL client in the same `:app` on the same host, make sure you set this to something unique!).

Any other valid KCL Worker ‘:options` may be passed as a nested hash.

For example, to configure a ‘tail` app on `some-stream` and use the default `:worker_id`, you might pass the following configuration to your KCL.

config = {
  app: 'tail',
  stream: 'some-stream',
  options: {initial_position_in_stream: 'TRIM_HORIZON'}
}

To actually process the stream, a KCL client creates record processors. These are objects that correspond to the KCL’s RecordProcessor interface - processors must implement ‘init`, `process_records`, and `shutdown` methods.

docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app-java.html#kcl-java-interface-v2

To specify which record processor to create, pass a block to your distribtued consumer that returns a new record processor. This block may (nay, WILL) be called from a background thread so make sure that it’s thread-safe.

Telekinesis provides a BaseProcessor that implements no-op versions of all of the required methods to make writing quick processors easier and a Block processor that executes the given block every time ‘process_records` is called.

To write a simple stream tailer, you might use Block as follows:

kcl_worker = Telekinesis::Consumer::KCL.new(config) do
  Telekinesis::Consumer::BlockProcessor.new do |records, checkpointer, millis_behind_latest|
    records.each{|r| puts r}
    $stderr.puts "#{millis_behind_latest} ms behind"
    checkpointer.checkpoint
  end
end

kcl_worker.run

Raises:

  • (ArgumentError)


67
68
69
70
71
# File 'lib/telekinesis/consumer/kcl.rb', line 67

def initialize(config, &block)
  raise ArgumentError, "No block given!" unless block_given?
  kcl_config = self.class.build_config(config)
  @under = com.kickstarter.jruby.Telekinesis.new_worker(kcl_config, config[:executor], &block)
end

Instance Method Details

#as_runnableObject

Return the underlying KCL worker. It’s a java.lang.Runnable.



74
75
76
# File 'lib/telekinesis/consumer/kcl.rb', line 74

def as_runnable
  @under
end

#run(background = false) ⇒ Object

Start the KCL worker. If background is set to ‘true`, the worker is started in its own JRuby Thread and the Thread is returned. Otherwise, starts in the current thread and returns nil.



81
82
83
84
85
86
87
# File 'lib/telekinesis/consumer/kcl.rb', line 81

def run(background = false)
  if background
    Thread.new { @under.run }
  else
    @under.run
  end
end