Class: Fluent::Plugin::LokiOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LokiOutput
- Defined in:
- lib/fluent/plugin/out_loki.rb
Overview
Subclass of Fluent Plugin Output
Defined Under Namespace
Classes: LogPostError
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
'memory'
Instance Attribute Summary collapse
-
#record_accessors ⇒ Object
Returns the value of attribute record_accessors.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity.
- #generic_to_loki(chunk) ⇒ Object
- #http_opts(uri) ⇒ Object
- #load_ssl ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #ssl_cert? ⇒ Boolean
- #ssl_opts(uri) ⇒ Object
- #validate_ssl_key ⇒ Object
-
#write(chunk) ⇒ Object
flush a chunk to loki.
Instance Attribute Details
#record_accessors ⇒ Object
Returns the value of attribute record_accessors.
34 35 36 |
# File 'lib/fluent/plugin/out_loki.rb', line 34 def record_accessors @record_accessors end |
Instance Method Details
#configure(conf) ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/fluent/plugin/out_loki.rb', line 75 def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity compat_parameters_convert(conf, :buffer) super @uri = URI.parse(@url + '/loki/api/v1/push') unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS) raise Fluent::ConfigError, 'url parameter must be valid HTTP' end @record_accessors = {} conf.elements.select { |element| element.name == 'label' }.each do |element| element.each_pair do |k, v| element.has_key?(k) # rubocop:disable Style/PreferredHashMethods #to suppress unread configuration warning v = k if v.empty? @record_accessors[k] = record_accessor_create(v) end end @remove_keys_accessors = [] @remove_keys.each do |key| @remove_keys_accessors.push(record_accessor_create(key)) end if ssl_cert? load_ssl validate_ssl_key end raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert) end |
#generic_to_loki(chunk) ⇒ Object
170 171 172 173 174 175 |
# File 'lib/fluent/plugin/out_loki.rb', line 170 def generic_to_loki(chunk) # log.debug("GenericToLoki: converting #{chunk}") streams = chunk_to_loki(chunk) payload = payload_builder(streams) payload end |
#http_opts(uri) ⇒ Object
123 124 125 126 127 128 |
# File 'lib/fluent/plugin/out_loki.rb', line 123 def http_opts(uri) opts = { use_ssl: uri.scheme == 'https' } opts end |
#load_ssl ⇒ Object
108 109 110 111 |
# File 'lib/fluent/plugin/out_loki.rb', line 108 def load_ssl @cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert @key = OpenSSL::PKey.read(File.read(@key)) if @key end |
#multi_workers_ready? ⇒ Boolean
119 120 121 |
# File 'lib/fluent/plugin/out_loki.rb', line 119 def multi_workers_ready? true end |
#ssl_cert? ⇒ Boolean
104 105 106 |
# File 'lib/fluent/plugin/out_loki.rb', line 104 def ssl_cert? !@key.nil? && !@cert.nil? end |
#ssl_opts(uri) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/fluent/plugin/out_loki.rb', line 149 def ssl_opts(uri) opts = { use_ssl: uri.scheme == 'https' } if !@cert.nil? && !@key.nil? opts = opts.merge( verify_mode: OpenSSL::SSL::VERIFY_PEER, cert: @cert, key: @key ) end unless @ca_cert.nil? opts = opts.merge( ca_file: @ca_cert ) end opts end |
#validate_ssl_key ⇒ Object
113 114 115 116 117 |
# File 'lib/fluent/plugin/out_loki.rb', line 113 def validate_ssl_key if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA) raise "Unsupported private key type #{key.class}" end end |
#write(chunk) ⇒ Object
flush a chunk to loki
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/fluent/plugin/out_loki.rb', line 131 def write(chunk) # streams by label payload = generic_to_loki(chunk) body = { 'streams' => payload } # add ingest path to loki url res = loki_http_request(body) return if res.is_a?(Net::HTTPSuccess) res_summary = "#{res.code} #{res.} #{res.body}" log.warn "failed to write post to #{@uri} (#{res_summary})" log.debug Yajl.dump(body) # Only retry 429 and 500s raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError) end |