Class: Fluent::Plugin::LokiOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_loki.rb

Overview

Subclass of Fluent Plugin Output

Defined Under Namespace

Classes: LogPostError

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
'memory'

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#record_accessorsObject

Returns the value of attribute record_accessors.



34
35
36
# File 'lib/fluent/plugin/out_loki.rb', line 34

def record_accessors
  @record_accessors
end

Instance Method Details

#client_cert_configured?Boolean

rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

Returns:

  • (Boolean)


131
132
133
# File 'lib/fluent/plugin/out_loki.rb', line 131

def client_cert_configured?
  !@key.nil? && !@cert.nil?
end

#configure(conf) ⇒ Object

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/fluent/plugin/out_loki.rb', line 85

def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  @uri = URI.parse("#{@url}/loki/api/v1/push")
  unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS)
    raise Fluent::ConfigError, 'URL parameter must have HTTP/HTTPS scheme'
  end

  @record_accessors = {}
  conf.elements.select { |element| element.name == 'label' }.each do |element|
    element.each_pair do |k, v|
      element.has_key?(k) # rubocop:disable Style/PreferredHashMethods #to suppress unread configuration warning
      v = k if v.empty?
      @record_accessors[k] = record_accessor_create(v)
    end
  end
  @remove_keys_accessors = []
  @remove_keys.each do |key|
    @remove_keys_accessors.push(record_accessor_create(key))
  end

  # If configured, load and validate client certificate (and corresponding key)
  if client_cert_configured?
    load_client_cert
    validate_client_cert_key
  end

  if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file)
    raise "bearer_token_file #{@bearer_token_file} not found"
  end

  @auth_token_bearer = nil
  unless @bearer_token_file.nil?
    raise "bearer_token_file #{@bearer_token_file} not found" unless File.exist?(@bearer_token_file)

    # Read the file once, assume long-lived authentication token.
    @auth_token_bearer = File.read(@bearer_token_file)
    raise "bearer_token_file #{@bearer_token_file} is empty" if @auth_token_bearer.empty?

    log.info "will use Bearer token from bearer_token_file #{@bearer_token_file} in Authorization header"
  end

  raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert)
end

#generic_to_loki(chunk) ⇒ Object



204
205
206
207
208
# File 'lib/fluent/plugin/out_loki.rb', line 204

def generic_to_loki(chunk)
  # log.debug("GenericToLoki: converting #{chunk}")
  streams = chunk_to_loki(chunk)
  payload_builder(streams)
end

#http_request_opts(uri) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/fluent/plugin/out_loki.rb', line 174

def http_request_opts(uri)
  opts = {
    use_ssl: uri.scheme == 'https'
  }

  # Optionally disable server server certificate verification.
  if @insecure_tls
    opts = opts.merge(
      verify_mode: OpenSSL::SSL::VERIFY_NONE
    )
  end

  # Optionally present client certificate
  if !@cert.nil? && !@key.nil?
    opts = opts.merge(
      cert: @cert,
      key: @key
    )
  end

  # For server certificate verification: set custom CA bundle.
  # Only takes effect when `insecure_tls` is not set.
  unless @ca_cert.nil?
    opts = opts.merge(
      ca_file: @ca_cert
    )
  end
  opts
end

#load_client_certObject



135
136
137
138
# File 'lib/fluent/plugin/out_loki.rb', line 135

def load_client_cert
  @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
  @key = OpenSSL::PKey.read(File.read(@key)) if @key
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/fluent/plugin/out_loki.rb', line 146

def multi_workers_ready?
  true
end

#validate_client_cert_keyObject



140
141
142
143
144
# File 'lib/fluent/plugin/out_loki.rb', line 140

def validate_client_cert_key
  if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
    raise "Unsupported private key type #{key.class}"
  end
end

#write(chunk) ⇒ Object

flush a chunk to loki

Raises:



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/fluent/plugin/out_loki.rb', line 151

def write(chunk)
  # streams by label
  payload = generic_to_loki(chunk)
  body = { 'streams' => payload }

  tenant = extract_placeholders(@tenant, chunk) if @tenant

  # add ingest path to loki url
  res = loki_http_request(body, tenant)

  if res.is_a?(Net::HTTPSuccess)
    log.debug "POST request was responded to with status code #{res.code}"
    return
  end

  res_summary = "#{res.code} #{res.message} #{res.body}"
  log.warn "failed to write post to #{@uri} (#{res_summary})"
  log.debug Yajl.dump(body)

  # Only retry 429 and 500s
  raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError)
end