Class: KinesisClient
- Inherits:
-
Object
show all
- Defined in:
- lib/kinesis_client.rb
Overview
Model representing the result message posted to Kinesis stream about everything that has gone on here – good, bad, or otherwise.
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of KinesisClient.
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# File 'lib/kinesis_client.rb', line 10
def initialize(config)
@config = config
@stream_name = @config[:stream_name]
@avro = nil
@batch_size = @config[:batch_size] || 1
@batch_count = 0
@records = []
@automatically_push = !(@config[:automatically_push] == false)
@client_options = config[:profile] ? { profile: config[:profile] } : {}
@client = Aws::Kinesis::Client.new(@client_options)
@avro = NYPLAvro.by_name(config[:schema_string]) if config[:schema_string]
@shovel_method = @batch_size > 1 ? :push_to_records : :push_record
end
|
Instance Attribute Details
#avro ⇒ Object
Returns the value of attribute avro.
8
9
10
|
# File 'lib/kinesis_client.rb', line 8
def avro
@avro
end
|
#config ⇒ Object
Returns the value of attribute config.
8
9
10
|
# File 'lib/kinesis_client.rb', line 8
def config
@config
end
|
Instance Method Details
#<<(json_message) ⇒ Object
40
41
42
|
# File 'lib/kinesis_client.rb', line 40
def <<(json_message)
send(@shovel_method, json_message)
end
|
#convert_to_record(json_message) ⇒ Object
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/kinesis_client.rb', line 26
def convert_to_record(json_message)
if config[:schema_string]
message = avro.encode(json_message, false)
else
message = json_message
end
partition_key = (config[:partition_key] ? json_message[config[:partition_key]] : SecureRandom.hex(20)).hash.to_s
{
data: message,
partition_key: partition_key
}
end
|
#filter_failures(resp) ⇒ Object
102
103
104
105
106
|
# File 'lib/kinesis_client.rb', line 102
def filter_failures(resp)
resp.records.filter_map.with_index do |record, i|
avro.decode(@records[i + @batch_size * @batch_count]) if record.responds_to?(:error_message)
end
end
|
#push_batch(batch) ⇒ Object
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/kinesis_client.rb', line 72
def push_batch(batch)
resp = @client.put_records({
records: batch.to_a,
stream_name: @stream_name
})
$logger.debug("Received #{resp} from #{@stream_name}")
if resp.failed_record_count > 0
return_message = {
failures: resp.failed_record_count,
failures_data: filter_failures(resp)
}
$logger.warn("Message sent to #{config[:stream_name]} #{return_message}") if $logger
else
$logger.info("Message sent to #{config[:stream_name]} successfully") if $logger
end
end
|
#push_record(json_message) ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/kinesis_client.rb', line 44
def push_record(json_message)
record = convert_to_record(json_message)
record[:stream_name] = @stream_name
@client.put_record(record)
return_hash = {}
if resp.successful?
return_hash["code"] = "200"
return_hash["message"] = json_message, resp
$logger.info("Message sent to #{config[:stream_name]} #{json_message}, #{resp}") if $logger
else
$logger.error("message" => "FAILED to send message to #{@stream_name} #{json_message}, #{resp}.") if $logger
raise(NYPLError.new(json_message, resp))
end
return_hash
end
|
#push_records ⇒ Object
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/kinesis_client.rb', line 91
def push_records
if @records.length > 0
@records.each_slice(@batch_size) do |slice|
push_batch(slice)
@batch_count += 1
end
@records = []
@batch_count = 0
end
end
|
#push_to_records(json_message) ⇒ Object
63
64
65
66
67
68
69
70
|
# File 'lib/kinesis_client.rb', line 63
def push_to_records(json_message)
begin
@records << convert_to_record(json_message)
rescue AvroError => e
$logger.error("message" => "Avro encoding error #{e.message} for #{json_message}")
end
push_records if @automatically_push && @records.length >= @batch_size
end
|