Class: KinesisClient
- Inherits:
-
Object
show all
- Defined in:
- lib/kinesis_client.rb
Overview
Model representing the result message posted to Kinesis stream about everything that has gone on here – good, bad, or otherwise.
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of KinesisClient.
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# File 'lib/kinesis_client.rb', line 10
def initialize(config)
@config = config
@stream_name = @config[:stream_name]
@avro = nil
@batch_size = @config[:batch_size] || 1
@client_options = set_config(config)
@batch_count = 0
@records = []
@automatically_push = !(@config[:automatically_push] == false)
@client = Aws::Kinesis::Client.new(@client_options)
@avro = NYPLAvro.by_name(config[:schema_string]) if config[:schema_string]
@shovel_method = @batch_size > 1 ? :push_to_records : :push_record
end
|
Instance Attribute Details
#avro ⇒ Object
Returns the value of attribute avro.
8
9
10
|
# File 'lib/kinesis_client.rb', line 8
def avro
@avro
end
|
#config ⇒ Object
Returns the value of attribute config.
8
9
10
|
# File 'lib/kinesis_client.rb', line 8
def config
@config
end
|
Instance Method Details
#<<(json_message) ⇒ Object
50
51
52
|
# File 'lib/kinesis_client.rb', line 50
def <<(json_message)
send(@shovel_method, json_message)
end
|
#convert_to_record(json_message) ⇒ Object
36
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/kinesis_client.rb', line 36
def convert_to_record(json_message)
if config[:schema_string]
message = avro.encode(json_message, false)
else
message = json_message
end
partition_key = (config[:partition_key] ? json_message[config[:partition_key]] : SecureRandom.hex(20)).hash.to_s
{
data: message,
partition_key: partition_key
}
end
|
#filter_failures(resp) ⇒ Object
111
112
113
114
115
|
# File 'lib/kinesis_client.rb', line 111
def filter_failures(resp)
resp.records.filter_map.with_index do |record, i|
avro.decode(@records[i + @batch_size * @batch_count]) if record.responds_to?(:error_message)
end
end
|
#push_batch(batch) ⇒ Object
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/kinesis_client.rb', line 83
def push_batch(batch)
resp = @client.put_records({
records: batch.to_a,
stream_name: @stream_name
})
if resp.failed_record_count > 0
failure_message = {
failures: resp.failed_record_count,
failures_data: filter_failures(resp)
}
$logger.warn("Batch sent to #{config[:stream_name]} with failures: #{failure_message}")
else
$logger.info("Batch sent to #{config[:stream_name]} successfully")
end
end
|
#push_record(json_message) ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/kinesis_client.rb', line 55
def push_record(json_message)
record = convert_to_record(json_message)
record[:stream_name] = @stream_name
@client.put_record(record)
return_hash = {}
if resp.successful?
return_hash["code"] = "200"
return_hash["message"] = json_message, resp
$logger.info("Message sent to #{config[:stream_name]} #{json_message}, #{resp}") if $logger
else
$logger.error("message" => "FAILED to send message to #{@stream_name} #{json_message}, #{resp}.") if $logger
raise(NYPLError.new(json_message, resp))
end
return_hash
end
|
#push_records ⇒ Object
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/kinesis_client.rb', line 100
def push_records
if @records.length > 0
@records.each_slice(@batch_size) do |slice|
push_batch(slice)
@batch_count += 1
end
@records = []
@batch_count = 0
end
end
|
#push_to_records(json_message) ⇒ Object
74
75
76
77
78
79
80
81
|
# File 'lib/kinesis_client.rb', line 74
def push_to_records(json_message)
begin
@records << convert_to_record(json_message)
rescue AvroError => e
$logger.error("message" => "Avro encoding error #{e.message} for #{json_message}")
end
push_records if @automatically_push && @records.length >= @batch_size
end
|
#set_config(config) ⇒ Object
26
27
28
29
30
31
32
33
34
|
# File 'lib/kinesis_client.rb', line 26
def set_config(config)
if config[:profile]
{ profile: config[:profile] }
elsif config[:custom_aws_config]
config[:custom_aws_config]
else
{}
end
end
|