Class: LogStashLogger::Device::AwsStream

Inherits:
Connectable show all
Defined in:
lib/logstash-logger/device/aws_stream.rb

Direct Known Subclasses

Firehose, Kinesis

Constant Summary collapse

DEFAULT_REGION =
'us-east-1'
DEFAULT_STREAM =
'logstash'

Class Attribute Summary collapse

Instance Attribute Summary collapse

Attributes inherited from Connectable

#buffer_logger

Attributes inherited from Base

#error_logger, #io, #sync

Instance Method Summary collapse

Methods inherited from Connectable

#close, #connected?, #flush, #on_full_buffer_receive, #reconnect, #to_io, #write

Methods included from Buffer

#buffer_flush, #buffer_full?, #buffer_initialize, #buffer_receive, #reset_buffer

Methods inherited from Base

#close, #flush, #to_io, #unrecoverable_error?, #write

Constructor Details

#initialize(opts) ⇒ AwsStream



19
20
21
22
23
24
25
# File 'lib/logstash-logger/device/aws_stream.rb', line 19

def initialize(opts)
  super
  @access_key_id = opts[:aws_access_key_id] || ENV['AWS_ACCESS_KEY_ID']
  @secret_access_key = opts[:aws_secret_access_key] || ENV['AWS_SECRET_ACCESS_KEY']
  @aws_region = opts[:aws_region] || DEFAULT_REGION
  @stream = opts[:stream] || DEFAULT_STREAM
end

Class Attribute Details

.recoverable_error_codesObject

Returns the value of attribute recoverable_error_codes



14
15
16
# File 'lib/logstash-logger/device/aws_stream.rb', line 14

def recoverable_error_codes
  @recoverable_error_codes
end

.stream_classObject

Returns the value of attribute stream_class



14
15
16
# File 'lib/logstash-logger/device/aws_stream.rb', line 14

def stream_class
  @stream_class
end

Instance Attribute Details

#aws_regionObject

Returns the value of attribute aws_region



17
18
19
# File 'lib/logstash-logger/device/aws_stream.rb', line 17

def aws_region
  @aws_region
end

#streamObject

Returns the value of attribute stream



17
18
19
# File 'lib/logstash-logger/device/aws_stream.rb', line 17

def stream
  @stream
end

Instance Method Details

#close!Object



84
85
86
# File 'lib/logstash-logger/device/aws_stream.rb', line 84

def close!
  @io = nil
end

#connectObject



43
44
45
46
47
48
# File 'lib/logstash-logger/device/aws_stream.rb', line 43

def connect
  @io = self.class.stream_class.new(
    region: @aws_region,
    credentials: ::Aws::Credentials.new(@access_key_id, @secret_access_key)
  )
end

#get_response_records(resp) ⇒ Object



39
40
41
# File 'lib/logstash-logger/device/aws_stream.rb', line 39

def get_response_records(resp)
  fail NotImplementedError
end

#is_successful_response(resp) ⇒ Object



35
36
37
# File 'lib/logstash-logger/device/aws_stream.rb', line 35

def is_successful_response(resp)
  fail NotImplementedError
end

#put_records(records) ⇒ Object



31
32
33
# File 'lib/logstash-logger/device/aws_stream.rb', line 31

def put_records(records)
  fail NotImplementedError
end

#transform_message(message) ⇒ Object



27
28
29
# File 'lib/logstash-logger/device/aws_stream.rb', line 27

def transform_message(message)
  fail NotImplementedError
end

#with_connectionObject



50
51
52
53
54
55
56
57
# File 'lib/logstash-logger/device/aws_stream.rb', line 50

def with_connection
  connect unless connected?
  yield
rescue => e
  log_error(e)
  log_warning("giving up")
  close(flush: false)
end

#write_batch(messages, group = nil) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/logstash-logger/device/aws_stream.rb', line 59

def write_batch(messages, group = nil)
  records = messages.map{ |m| transform_message(m) }

  with_connection do
    resp = put_records(records)

    # Put any failed records back into the buffer
    if !is_successful_response(resp)
      get_response_records(resp).each_with_index do |record, index|
        if self.class.recoverable_error_codes.include?(record.error_code)
          log_warning("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
          log_warning("Retrying")
          write(records[index][:data])
        elsif !record.error_code.nil? && record.error_code != ''
          log_error("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
        end
      end
    end
  end
end

#write_one(message) ⇒ Object



80
81
82
# File 'lib/logstash-logger/device/aws_stream.rb', line 80

def write_one(message)
  write_batch([message])
end