Class: KinesisClient
- Inherits:
-
Object
show all
- Defined in:
- lib/kinesis_client.rb
Overview
Model representing the result message posted to Kinesis stream about everything that has gone on here – good, bad, or otherwise.
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
Returns a new instance of KinesisClient.
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/kinesis_client.rb', line 10
def initialize(config)
@config = config
@stream_name = @config[:stream_name]
@avro = nil
@batch_size = @config[:batch_size] || 1
@batch = []
@automatically_push = @config[:automatically_push] == false ? false : true
@client_options = config[:profile] ? { profile: config[:profile] } : {}
@client = Aws::Kinesis::Client.new @client_options
if config[:schema_string]
@avro = NYPLAvro.by_name(config[:schema_string])
end
@shovel_method = @batch_size > 1 ? :push_to_batch : :push_record
end
|
Instance Attribute Details
#avro ⇒ Object
Returns the value of attribute avro.
8
9
10
|
# File 'lib/kinesis_client.rb', line 8
def avro
@avro
end
|
#config ⇒ Object
Returns the value of attribute config.
8
9
10
|
# File 'lib/kinesis_client.rb', line 8
def config
@config
end
|
Instance Method Details
#<<(json_message) ⇒ Object
43
44
45
|
# File 'lib/kinesis_client.rb', line 43
def <<(json_message)
send(@shovel_method, json_message)
end
|
#convert_to_record(json_message) ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/kinesis_client.rb', line 28
def convert_to_record(json_message)
if config[:schema_string]
message = avro.encode(json_message, false)
else
message = json_message
end
partition_key = (config[:partition_key] ? json_message[config[:partition_key]] : SecureRandom.hex(20)).hash.to_s
{
data: message,
partition_key: partition_key
}
end
|
#push_batch(batch) ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/kinesis_client.rb', line 77
def push_batch(batch)
resp = @client.put_records({
records: batch.to_a,
stream_name: @stream_name
})
$logger.debug("Received #{resp} from #{@stream_name}")
return_message = {
failures: resp.failed_record_count,
error_messages: resp.records.map {|record| record.error_message }.compact
}
$logger.info("Message sent to #{config[:stream_name]} #{return_message}") if $logger
return {
"code": "200",
"message": return_message.to_json
}
end
|
#push_record(json_message) ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/kinesis_client.rb', line 47
def push_record(json_message)
record = convert_to_record json_message
record[:stream_name] = @stream_name
@client.put_record record
return_hash = {}
if resp.successful?
return_hash["code"] = "200"
return_hash["message"] = json_message, resp
$logger.info("Message sent to #{config[:stream_name]} #{json_message}, #{resp}") if $logger
else
$logger.error("message" => "FAILED to send message to HoldRequestResult #{json_message}, #{resp}.") if $logger
raise NYPLError.new json_message, resp
end
return_hash
end
|
#push_records ⇒ Object
98
99
100
101
|
# File 'lib/kinesis_client.rb', line 98
def push_records
@batch.each_slice(@batch_size) {|slice| push_batch slice}
@batch = []
end
|
#push_to_batch(json_message) ⇒ Object
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/kinesis_client.rb', line 66
def push_to_batch(json_message)
begin
@batch << convert_to_record(json_message)
rescue AvroError => e
$logger.error("message" => "Avro encoding error #{e.message} for #{json_message}")
end
if @automatically_push && @batch.length >= @batch_size
push_records
end
end
|