Class: KinesisClient
- Inherits:
-
Object
show all
- Defined in:
- lib/kinesis_client.rb
Overview
Model representing the result message posted to Kinesis stream about everything that has gone on here – good, bad, or otherwise.
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of KinesisClient.
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# File 'lib/kinesis_client.rb', line 11
def initialize(config)
@config = config
@stream_name = @config[:stream_name]
@avro = nil
@batch_size = @config[:batch_size] || 1
@client_options = set_config(config)
@records = []
@failed_records = []
@automatically_push = !(@config[:automatically_push] == false)
@client = Aws::Kinesis::Client.new(@client_options)
@avro = NYPLAvro.by_name(config[:schema_string]) if config[:schema_string]
@shovel_method = @batch_size > 1 ? :push_to_records : :push_record
end
|
Instance Attribute Details
#avro ⇒ Object
note custom defined :failed_records method
9
10
11
|
# File 'lib/kinesis_client.rb', line 9
def avro
@avro
end
|
#config ⇒ Object
note custom defined :failed_records method
9
10
11
|
# File 'lib/kinesis_client.rb', line 9
def config
@config
end
|
Instance Method Details
#<<(json_message) ⇒ Object
51
52
53
|
# File 'lib/kinesis_client.rb', line 51
def <<(json_message)
send(@shovel_method, json_message)
end
|
#convert_to_record(json_message) ⇒ Object
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/kinesis_client.rb', line 37
def convert_to_record(json_message)
if config[:schema_string]
message = avro.encode(json_message, false)
else
message = json_message
end
partition_key = (config[:partition_key] ? json_message[config[:partition_key]] : SecureRandom.hex(20)).hash.to_s
{
data: message,
partition_key: partition_key
}
end
|
#failed_records ⇒ Object
122
123
124
|
# File 'lib/kinesis_client.rb', line 122
def failed_records
@failed_records.map { |record| avro.decode(record) }
end
|
#filter_failures(resp, batch) ⇒ Object
108
109
110
111
112
|
# File 'lib/kinesis_client.rb', line 108
def filter_failures(resp, batch)
resp.records.filter_map.with_index do |record, i|
{ record: batch[i], error_message: record.error_message } if record.responds_to?(:error_message)
end
end
|
#push_batch(batch) ⇒ Object
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/kinesis_client.rb', line 85
def push_batch(batch)
resp = @client.put_records({
records: batch.to_a,
stream_name: @stream_name
})
if resp.failed_record_count > 0
failures = filter_failures(resp, batch)
$logger.warn("Batch sent to #{config[:stream_name]} with #{failures.length} failures: #{failures}")
failures.each{|failure| @failed_records << failure[:record]}
else
$logger.info("Batch sent to #{config[:stream_name]} successfully")
end
end
|
#push_record(json_message) ⇒ Object
This method is broken TO DO: figure out how to determine successful or failed record, successful? is not a method on the object
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/kinesis_client.rb', line 57
def push_record(json_message)
record = convert_to_record(json_message)
record[:stream_name] = @stream_name
@client.put_record(record)
return_hash = {}
if resp.successful?
return_hash["code"] = "200"
return_hash["message"] = json_message, resp
$logger.info("Message sent to #{config[:stream_name]} #{json_message}, #{resp}") if $logger
else
$logger.error("message" => "FAILED to send message to #{@stream_name} #{json_message}, #{resp}.") if $logger
raise(NYPLError.new(json_message, resp))
end
return_hash
end
|
#push_records ⇒ Object
99
100
101
102
103
104
105
106
|
# File 'lib/kinesis_client.rb', line 99
def push_records
if @records.length > 0
@records.each_slice(@batch_size) do |slice|
push_batch(slice)
end
@records = []
end
end
|
#push_to_records(json_message) ⇒ Object
76
77
78
79
80
81
82
83
|
# File 'lib/kinesis_client.rb', line 76
def push_to_records(json_message)
begin
@records << convert_to_record(json_message)
rescue AvroError => e
$logger.error("message" => "Avro encoding error #{e.message} for #{json_message}")
end
push_records if @automatically_push && @records.length >= @batch_size
end
|
#retry_failed_records ⇒ Object
114
115
116
117
118
119
120
|
# File 'lib/kinesis_client.rb', line 114
def retry_failed_records
unless @failed_records.empty?
@records = @failed_records
@failed_records = []
push_records
end
end
|
#set_config(config) ⇒ Object
27
28
29
30
31
32
33
34
35
|
# File 'lib/kinesis_client.rb', line 27
def set_config(config)
if config[:profile]
{ profile: config[:profile] }
elsif config[:custom_aws_config]
config[:custom_aws_config]
else
{}
end
end
|