Class: Telekinesis::Consumer::Block
- Inherits:
-
BaseProcessor
- Object
- BaseProcessor
- Telekinesis::Consumer::Block
- Defined in:
- lib/telekinesis/consumer/block.rb
Overview
A RecordProcessor that uses the given block to process records. Useful to quickly define a consumer.
Telekinesis::Consumer::Worker.new(stream: ‘my-stream’, app: ‘tail’) do
Telekinesis::Consumer::Block.new do |records, checkpointer, millis_behind_latest|
records.each {|r| puts r}
$stderr.puts "#{millis_behind_latest} ms behind"
checkpointer.checkpoint
end
end
Instance Method Summary collapse
-
#initialize(&block) ⇒ Block
constructor
A new instance of Block.
- #process_records(input) ⇒ Object
Methods inherited from BaseProcessor
Constructor Details
#initialize(&block) ⇒ Block
Returns a new instance of Block.
14 15 16 17 |
# File 'lib/telekinesis/consumer/block.rb', line 14 def initialize(&block) raise ArgumentError, "No block given" unless block_given? @block = block end |
Instance Method Details
#process_records(input) ⇒ Object
19 20 21 |
# File 'lib/telekinesis/consumer/block.rb', line 19 def process_records(input) @block.call(input.records, input.checkpointer, input.millis_behind_latest) end |