Class: Fluent::Plugin::LokiOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_loki.rb

Overview

Subclass of Fluent Plugin Output

Defined Under Namespace

Classes: LogPostError

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
'memory'

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#record_accessorsObject

Returns the value of attribute record_accessors.



34
35
36
# File 'lib/fluent/plugin/out_loki.rb', line 34

def record_accessors
  @record_accessors
end

Instance Method Details

#configure(conf) ⇒ Object

rubocop:disable Metrics/CyclomaticComplexity



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/out_loki.rb', line 75

def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity
  compat_parameters_convert(conf, :buffer)
  super
  @uri = URI.parse(@url + '/loki/api/v1/push')
  unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS)
    raise Fluent::ConfigError, 'url parameter must be valid HTTP'
  end

  @record_accessors = {}
  conf.elements.select { |element| element.name == 'label' }.each do |element|
    element.each_pair do |k, v|
      element.has_key?(k) # rubocop:disable Style/PreferredHashMethods #to suppress unread configuration warning
      v = k if v.empty?
      @record_accessors[k] = record_accessor_create(v)
    end
  end
  @remove_keys_accessors = []
  @remove_keys.each do |key|
    @remove_keys_accessors.push(record_accessor_create(key))
  end

  if ssl_cert?
    load_ssl
    validate_ssl_key
  end

  raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert)
end

#generic_to_loki(chunk) ⇒ Object



170
171
172
173
174
175
# File 'lib/fluent/plugin/out_loki.rb', line 170

def generic_to_loki(chunk)
  # log.debug("GenericToLoki: converting #{chunk}")
  streams = chunk_to_loki(chunk)
  payload = payload_builder(streams)
  payload
end

#http_opts(uri) ⇒ Object



123
124
125
126
127
128
# File 'lib/fluent/plugin/out_loki.rb', line 123

def http_opts(uri)
  opts = {
    use_ssl: uri.scheme == 'https'
  }
  opts
end

#load_sslObject



108
109
110
111
# File 'lib/fluent/plugin/out_loki.rb', line 108

def load_ssl
  @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
  @key = OpenSSL::PKey.read(File.read(@key)) if @key
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/fluent/plugin/out_loki.rb', line 119

def multi_workers_ready?
  true
end

#ssl_cert?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/fluent/plugin/out_loki.rb', line 104

def ssl_cert?
  !@key.nil? && !@cert.nil?
end

#ssl_opts(uri) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/fluent/plugin/out_loki.rb', line 149

def ssl_opts(uri)
  opts = {
    use_ssl: uri.scheme == 'https'
  }

  if !@cert.nil? && !@key.nil?
    opts = opts.merge(
      verify_mode: OpenSSL::SSL::VERIFY_PEER,
      cert: @cert,
      key: @key
    )
  end

  unless @ca_cert.nil?
    opts = opts.merge(
      ca_file: @ca_cert
    )
  end
  opts
end

#validate_ssl_keyObject



113
114
115
116
117
# File 'lib/fluent/plugin/out_loki.rb', line 113

def validate_ssl_key
  if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
    raise "Unsupported private key type #{key.class}"
  end
end

#write(chunk) ⇒ Object

flush a chunk to loki

Raises:



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/fluent/plugin/out_loki.rb', line 131

def write(chunk)
  # streams by label
  payload = generic_to_loki(chunk)
  body = { 'streams' => payload }

  # add ingest path to loki url
  res = loki_http_request(body)

  return if res.is_a?(Net::HTTPSuccess)

  res_summary = "#{res.code} #{res.message} #{res.body}"
  log.warn "failed to write post to #{@uri} (#{res_summary})"
  log.debug Yajl.dump(body)

  # Only retry 429 and 500s
  raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError)
end