Class: Fluent::Plugin::LokiOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::LokiOutput
- Defined in:
- lib/fluent/plugin/out_loki.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #create_request(tag, time, record) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#format_labels(labels) ⇒ Object
end send_request.
- #format_url(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
- #handle_record(tag, time, record) ⇒ Object
- #http_opts(uri) ⇒ Object
-
#initialize ⇒ LokiOutput
constructor
A new instance of LokiOutput.
- #multi_workers_ready? ⇒ Boolean
- #prefer_buffered_processing ⇒ Object
- #process(tag, es) ⇒ Object
- #proxies ⇒ Object
- #send_request(req, uri) ⇒ Object
- #set_body(req, tag, time, record) ⇒ Object
- #set_header(req, tag, time, record) ⇒ Object
- #set_json_body(req, data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ LokiOutput
Returns a new instance of LokiOutput.
14 15 16 |
# File 'lib/fluent/plugin/out_loki.rb', line 14 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/fluent/plugin/out_loki.rb', line 59 def configure(conf) compat_parameters_convert(conf, :buffer) super @encoder = case @encoder when :yajl Yajl when :json JSON end @ssl_verify_mode = if @ssl_no_verify OpenSSL::SSL::VERIFY_NONE else OpenSSL::SSL::VERIFY_PEER end @ca_file = @cacert_file @last_request_time = nil raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered end |
#create_request(tag, time, record) ⇒ Object
116 117 118 119 120 121 122 123 |
# File 'lib/fluent/plugin/out_loki.rb', line 116 def create_request(tag, time, record) url = format_url(tag, time, record) uri = URI.parse(url+"/api/prom/push") req = Net::HTTP::Post.new(uri.request_uri) set_body(req, tag, time, record) set_header(req, tag, time, record) return req, uri end |
#format(tag, time, record) ⇒ Object
201 202 203 |
# File 'lib/fluent/plugin/out_loki.rb', line 201 def format(tag, time, record) [time, record].to_msgpack end |
#format_labels(labels) ⇒ Object
end send_request
182 183 184 185 186 187 188 189 |
# File 'lib/fluent/plugin/out_loki.rb', line 182 def format_labels(labels) st = "{" labels.each do |key, val| st+= "#{key}=\"#{val}\"," end st[-1]="}" return st end |
#format_url(tag, time, record) ⇒ Object
88 89 90 |
# File 'lib/fluent/plugin/out_loki.rb', line 88 def format_url(tag, time, record) @endpoint_url end |
#formatted_to_msgpack_binary? ⇒ Boolean
205 206 207 |
# File 'lib/fluent/plugin/out_loki.rb', line 205 def formatted_to_msgpack_binary? true end |
#handle_record(tag, time, record) ⇒ Object
190 191 192 193 194 195 |
# File 'lib/fluent/plugin/out_loki.rb', line 190 def handle_record(tag, time, record) rec = {"streams"=>[{"labels"=>format_labels(@labels), "entries"=>[{"ts"=>Time.now.iso8601(3), "line"=>record.to_json}]}]} # I used time now instead of at 'time' because it cause 'Entry out of order' on loki's side req, uri = create_request(tag, time, rec) send_request(req, uri) end |
#http_opts(uri) ⇒ Object
125 126 127 128 129 130 131 132 |
# File 'lib/fluent/plugin/out_loki.rb', line 125 def http_opts(uri) opts = { :use_ssl => uri.scheme == 'https' } opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl] opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file) opts end |
#multi_workers_ready? ⇒ Boolean
209 210 211 |
# File 'lib/fluent/plugin/out_loki.rb', line 209 def multi_workers_ready? true end |
#prefer_buffered_processing ⇒ Object
197 198 199 |
# File 'lib/fluent/plugin/out_loki.rb', line 197 def prefer_buffered_processing @buffered end |
#process(tag, es) ⇒ Object
213 214 215 216 217 |
# File 'lib/fluent/plugin/out_loki.rb', line 213 def process(tag, es) es.each do |time, record| handle_record(tag, time, record) end end |
#proxies ⇒ Object
134 135 136 |
# File 'lib/fluent/plugin/out_loki.rb', line 134 def proxies ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy'] end |
#send_request(req, uri) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/fluent/plugin/out_loki.rb', line 138 def send_request(req, uri) is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?) if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec) log.info('Dropped request due to rate limiting') return end res = nil begin if @authentication == :basic req.basic_auth(@username, @password) elsif @authentication == :bearer req['authorization'] = "bearer #{@token}" elsif @authentication == :jwt req['authorization'] = "jwt #{@token}" end @last_request_time = Time.now.to_f if proxy = proxies proxy_uri = URI.parse(proxy) res = Net::HTTP.start(uri.host, uri.port, proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password, **http_opts(uri)) {|http| http.request(req) } else res = Net::HTTP.start(uri.host, uri.port, **http_opts(uri)) {|http| http.request(req) } end rescue => e # rescue all StandardErrors # server didn't respond log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'" raise e if @raise_on_error else unless res and res.is_a?(Net::HTTPSuccess) res_summary = if res "#{res.code} #{res.message} #{res.body}" else "res=nil" end log.warn "failed to #{req.method} #{uri} (#{res_summary})" end #end unless end # end begin end |
#set_body(req, tag, time, record) ⇒ Object
92 93 94 95 |
# File 'lib/fluent/plugin/out_loki.rb', line 92 def set_body(req, tag, time, record) set_json_body(req, record) req end |
#set_header(req, tag, time, record) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/fluent/plugin/out_loki.rb', line 97 def set_header(req, tag, time, record) if @tenant req["X-Scope-OrgID"] = @tenant end if @custom_headers @custom_headers.each do |k,v| req[k] = v end req else req end end |
#set_json_body(req, data) ⇒ Object
111 112 113 114 |
# File 'lib/fluent/plugin/out_loki.rb', line 111 def set_json_body(req, data) req.body = @encoder.dump(data) req['Content-Type'] = 'application/json' end |
#shutdown ⇒ Object
84 85 86 |
# File 'lib/fluent/plugin/out_loki.rb', line 84 def shutdown super end |
#start ⇒ Object
80 81 82 |
# File 'lib/fluent/plugin/out_loki.rb', line 80 def start super end |
#write(chunk) ⇒ Object
219 220 221 222 223 224 225 |
# File 'lib/fluent/plugin/out_loki.rb', line 219 def write(chunk) tag = chunk..tag @endpoint_url = extract_placeholders(@endpoint_url, chunk.) chunk.msgpack_each do |time, record| handle_record(tag, time, record) end end |