Class: Aws::KCLrb::RecordProcessorBase Abstract

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/kclrb/record_processor.rb

Overview

This class is abstract.

Base class for implementing a record processor.

A RecordProcessor processes a shard in a stream. See the corresponding KCL interface. Its methods will be called as follows:

  1. #init_processor will be called once
  2. #process_records will be called zero or more times
  3. #shutdown will be called if this MultiLangDaemon instance loses the lease to this shard

Instance Method Summary collapse

Instance Method Details

#init_processor(shard_id) ⇒ Object

This method is abstract.

Called once by a KCLProcess before any calls to process_records.

Parameters:

  • shard_id (String)

    The shard id that this processor is going to be working on.



22
23
24
# File 'lib/aws/kclrb/record_processor.rb', line 22

def init_processor(shard_id)
  fail NotImplementedError.new
end

#process_records(records, checkpointer) ⇒ Object

This method is abstract.

Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers from the records to indicate where in the stream to checkpoint.

Parameters:

  • records (Array<Hash>)

    A list of records that are to be processed. A record looks like:

    {"data":"<base64 encoded string>","partitionKey":"someKey","sequenceNumber":"1234567890"}
    

    Note that data attribute is a base64 encoded string. You can use Base64.decode64 in the base64 module to get the original data as a string.

  • checkpointer (Checkpointer)

    A checkpointer which accepts a sequence number or no parameters.



42
43
44
# File 'lib/aws/kclrb/record_processor.rb', line 42

def process_records(records, checkpointer)
  fail NotImplementedError.new
end

#shutdown(checkpointer, reason) ⇒ Object

This method is abstract.

Called by a KCLProcess instance to indicate that this record processor should shutdown. After this is called, there will be no more calls to any other methods of this record processor.

Parameters:

  • checkpointer (Checkpointer)

    A checkpointer which accepts a sequence number or no parameters.

  • reason (String)

    The reason this record processor is being shutdown, can be either TERMINATE or ZOMBIE.

    • If ZOMBIE, clients should not checkpoint because there is possibly another record processor which has acquired the lease for this shard.
    • If TERMINATE then checkpointer.checkpoint() (without parameters) should be called to checkpoint at the end of the shard so that this processor will be shutdown and new processor(s) will be created to for the child(ren) of this shard.


62
63
64
# File 'lib/aws/kclrb/record_processor.rb', line 62

def shutdown(checkpointer, reason)
  fail NotImplementedError.new
end

#shutdown_requested(checkpointer) ⇒ Object

This method is abstract.

Called by a KCLProcess instance to indicate that this record processor is requesting a shutdown. This method should be overriden if required.

Parameters:

  • checkpointer (Checkpointer)

    A checkpointer which accepts a sequence number or no parameters.



72
73
# File 'lib/aws/kclrb/record_processor.rb', line 72

def shutdown_requested(checkpointer)
end

#versionObject



75
76
77
# File 'lib/aws/kclrb/record_processor.rb', line 75

def version
  1
end