Class: Fluent::KinesisAltOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::KinesisAltOutput
- Defined in:
- lib/fluent/plugin/out_kinesis_alt.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ KinesisAltOutput
constructor
A new instance of KinesisAltOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ KinesisAltOutput
Returns a new instance of KinesisAltOutput.
5 6 7 8 9 10 11 |
# File 'lib/fluent/plugin/out_kinesis_alt.rb', line 5 def initialize super require 'aws-sdk' require 'base64' require 'json' require 'logger' end |
Instance Method Details
#configure(conf) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/out_kinesis_alt.rb', line 29 def configure(conf) super [:aws_key_id, :aws_sec_key, :region, :stream_name].each do |name| unless self.instance_variable_get("@#{name}") raise ConfigError, "'#{name}' is required" end end unless @partition_key or @partition_key_proc raise ConfigError, "'partition_key' or 'partition_key_proc' is required" end if @partition_key_proc @partition_key_proc = eval(@partition_key_proc) end if @explicit_hash_key_proc @explicit_hash_key_proc = eval(@explicit_hash_key_proc) end end |
#format(tag, time, record) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/out_kinesis_alt.rb', line 62 def format(tag, time, record) record['__tag'] = tag if @include_tag record['__time'] = time if @include_time # XXX: The maximum size of the data blob is 50 kilobytes # http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html data = { :stream_name => @stream_name, :data => encode64(record.to_json), :partition_key => get_key(:partition_key, record) } if @explicit_hash_key or @explicit_hash_key_proc data[:explicit_hash_key] = get_key(:explicit_hash_key, record) end if @sequence_number_for_ordering data[:sequence_number_for_ordering] = @sequence_number_for_ordering end pack_data(data) end |
#shutdown ⇒ Object
58 59 60 |
# File 'lib/fluent/plugin/out_kinesis_alt.rb', line 58 def shutdown super end |
#start ⇒ Object
51 52 53 54 55 56 |
# File 'lib/fluent/plugin/out_kinesis_alt.rb', line 51 def start super configure_aws @client = AWS.kinesis.client @client.describe_stream(:stream_name => @stream_name) end |
#write(chunk) ⇒ Object
85 86 87 88 89 90 91 |
# File 'lib/fluent/plugin/out_kinesis_alt.rb', line 85 def write(chunk) buf = chunk.read while (data = unpack_data(buf)) AWS.kinesis.client.put_record(data) end end |