Class: LogStashLogger::Device::AwsStream
Constant Summary
collapse
- DEFAULT_REGION =
'us-east-1'
- DEFAULT_STREAM =
'logstash'
Class Attribute Summary collapse
Instance Attribute Summary collapse
Attributes inherited from Connectable
#buffer_logger
Attributes inherited from Base
#error_logger, #io, #sync
Instance Method Summary
collapse
Methods inherited from Connectable
#close, #connected?, #flush, #on_full_buffer_receive, #reconnect, #to_io, #write
Methods included from Buffer
#buffer_flush, #buffer_full?, #buffer_initialize, #buffer_receive, #reset_buffer
Methods inherited from Base
#close, #flush, #to_io, #unrecoverable_error?, #write
Constructor Details
#initialize(opts) ⇒ AwsStream
Returns a new instance of AwsStream.
19
20
21
22
23
24
25
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 19
def initialize(opts)
super
@access_key_id = opts[:aws_access_key_id] || ENV['AWS_ACCESS_KEY_ID']
@secret_access_key = opts[:aws_secret_access_key] || ENV['AWS_SECRET_ACCESS_KEY']
@aws_region = opts[:aws_region] || DEFAULT_REGION
@stream = opts[:stream] || DEFAULT_STREAM
end
|
Class Attribute Details
.recoverable_error_codes ⇒ Object
Returns the value of attribute recoverable_error_codes.
14
15
16
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 14
def recoverable_error_codes
@recoverable_error_codes
end
|
.stream_class ⇒ Object
Returns the value of attribute stream_class.
14
15
16
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 14
def stream_class
@stream_class
end
|
Instance Attribute Details
#aws_region ⇒ Object
Returns the value of attribute aws_region.
17
18
19
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 17
def aws_region
@aws_region
end
|
#stream ⇒ Object
Returns the value of attribute stream.
17
18
19
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 17
def stream
@stream
end
|
Instance Method Details
#close! ⇒ Object
84
85
86
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 84
def close!
@io = nil
end
|
#connect ⇒ Object
43
44
45
46
47
48
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 43
def connect
@io = self.class.stream_class.new(
region: @aws_region,
credentials: ::Aws::Credentials.new(@access_key_id, @secret_access_key)
)
end
|
#get_response_records(resp) ⇒ Object
39
40
41
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 39
def get_response_records(resp)
fail NotImplementedError
end
|
#is_successful_response(resp) ⇒ Object
35
36
37
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 35
def is_successful_response(resp)
fail NotImplementedError
end
|
#put_records(records) ⇒ Object
31
32
33
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 31
def put_records(records)
fail NotImplementedError
end
|
27
28
29
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 27
def transform_message(message)
fail NotImplementedError
end
|
#with_connection ⇒ Object
50
51
52
53
54
55
56
57
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 50
def with_connection
connect unless connected?
yield
rescue => e
log_error(e)
log_warning("giving up")
close(flush: false)
end
|
#write_batch(messages, group = nil) ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 59
def write_batch(messages, group = nil)
records = messages.map{ |m| transform_message(m) }
with_connection do
resp = put_records(records)
if !is_successful_response(resp)
get_response_records(resp).each_with_index do |record, index|
if self.class.recoverable_error_codes.include?(record.error_code)
log_warning("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
log_warning("Retrying")
write(records[index][:data])
elsif !record.error_code.nil? && record.error_code != ''
log_error("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
end
end
end
end
end
|
#write_one(message) ⇒ Object
80
81
82
|
# File 'lib/logstash-logger/device/aws_stream.rb', line 80
def write_one(message)
write_batch([message])
end
|