Class: Fluent::Plugin::LokiOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LokiOutput
- Defined in:
- lib/fluent/plugin/out_loki.rb
Overview
Subclass of Fluent Plugin Output
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
'memory'
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #generic_to_loki(chunk) ⇒ Object
- #http_opts(uri) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
-
#write(chunk) ⇒ Object
flush a chunk to loki.
Instance Method Details
#configure(conf) ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/fluent/plugin/out_loki.rb', line 64 def configure(conf) compat_parameters_convert(conf, :buffer) super @label_keys = @label_keys.split(/\s*,\s*/) if @label_keys @remove_keys = @remove_keys.split(',').map(&:strip) if @remove_keys end |
#generic_to_loki(chunk) ⇒ Object
116 117 118 119 120 121 |
# File 'lib/fluent/plugin/out_loki.rb', line 116 def generic_to_loki(chunk) # log.debug("GenericToLoki: converting #{chunk}") streams = chunk_to_loki(chunk) payload = payload_builder(streams) payload end |
#http_opts(uri) ⇒ Object
76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_loki.rb', line 76 def http_opts(uri) opts = { use_ssl: uri.scheme == 'https' } opts end |
#multi_workers_ready? ⇒ Boolean
72 73 74 |
# File 'lib/fluent/plugin/out_loki.rb', line 72 def multi_workers_ready? true end |
#write(chunk) ⇒ Object
flush a chunk to loki
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/fluent/plugin/out_loki.rb', line 84 def write(chunk) # streams by label payload = generic_to_loki(chunk) body = { 'streams': payload } # add ingest path to loki url uri = URI.parse(url + '/api/prom/push') req = Net::HTTP::Post.new( uri.request_uri ) req.add_field('Content-Type', 'application/json') req.add_field('X-Scope-OrgID', @tenant) if @tenant req.body = Yajl.dump(body) req.basic_auth(@username, @password) if @username opts = { use_ssl: uri.scheme == 'https' } log.debug "sending #{req.body.length} bytes to loki" res = Net::HTTP.start(uri.hostname, uri.port, **opts) { |http| http.request(req) } unless res && res.is_a?(Net::HTTPSuccess) res_summary = if res "#{res.code} #{res.} #{res.body}" else 'res=nil' end log.warn "failed to #{req.method} #{uri} (#{res_summary})" log.warn Yajl.dump(body) end end |