Class: Telekinesis::Consumer::Block

Inherits:
BaseProcessor show all
Defined in:
lib/telekinesis/consumer/block.rb

Overview

A RecordProcessor that uses the given block to process records. Useful to quickly define a consumer.

Telekinesis::Consumer::Worker.new(stream: ‘my-stream’, app: ‘tail’) do

Telekinesis::Consumer::Block.new do |records, checkpointer, millis_behind_latest|
  records.each {|r| puts r}
  $stderr.puts "#{millis_behind_latest} ms behind"
  checkpointer.checkpoint
end

end

Instance Method Summary collapse

Methods inherited from BaseProcessor

#init, #shutdown

Constructor Details

#initialize(&block) ⇒ Block

Returns a new instance of Block.

Raises:

  • (ArgumentError)


14
15
16
17
# File 'lib/telekinesis/consumer/block.rb', line 14

def initialize(&block)
  raise ArgumentError, "No block given" unless block_given?
  @block = block
end

Instance Method Details

#process_records(input) ⇒ Object



19
20
21
# File 'lib/telekinesis/consumer/block.rb', line 19

def process_records(input)
  @block.call(input.records, input.checkpointer, input.millis_behind_latest)
end