Class: LogStash::Inputs::Kinesis::Worker

Inherits:
Object
  • Object
show all
Includes:
comcom.amazonawscom.amazonaws.servicescom.amazonaws.services.kinesiscom.amazonaws.services.kinesis.clientlibrarycom.amazonaws.services.kinesis.clientlibrary.interfacescom.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor
Defined in:
lib/logstash/inputs/kinesis/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/logstash/inputs/kinesis/worker.rb', line 12

def initialize(*args)
  # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
  if !@constructed
    @codec, @output_queue, @decorator, @checkpoint_interval, @logger = args
    @next_checkpoint = Time.now - 600
    @constructed = true
  else
    _shard_id, _ = args
    @decoder = java.nio.charset::Charset.forName("UTF-8").newDecoder()
  end
end

Instance Attribute Details

#checkpoint_intervalObject (readonly)

Returns the value of attribute checkpoint_interval.



4
5
6
# File 'lib/logstash/inputs/kinesis/worker.rb', line 4

def checkpoint_interval
  @checkpoint_interval
end

#codecObject (readonly)

Returns the value of attribute codec.



4
5
6
# File 'lib/logstash/inputs/kinesis/worker.rb', line 4

def codec
  @codec
end

#decoratorObject (readonly)

Returns the value of attribute decorator.



4
5
6
# File 'lib/logstash/inputs/kinesis/worker.rb', line 4

def decorator
  @decorator
end

#loggerObject (readonly)

Returns the value of attribute logger.



4
5
6
# File 'lib/logstash/inputs/kinesis/worker.rb', line 4

def logger
  @logger
end

Instance Method Details

#processRecords(records, checkpointer) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/logstash/inputs/kinesis/worker.rb', line 25

def processRecords(records, checkpointer)
  records.each { |record| process_record(record) }
  if Time.now >= @next_checkpoint
    checkpoint(checkpointer)
    @next_checkpoint = Time.now + @checkpoint_interval
  end
end

#shutdown(checkpointer, reason) ⇒ Object



33
34
35
36
37
# File 'lib/logstash/inputs/kinesis/worker.rb', line 33

def shutdown(checkpointer, reason)
  if reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
    checkpoint(checkpointer)
  end
end