Class: Fluent::Plugin::HTTPOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_http.rb

Defined Under Namespace

Classes: RecoverableResponse

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
DEFAULT_FORMATTER =
"json"

Instance Method Summary collapse

Constructor Details

#initializeHTTPOutput

Returns a new instance of HTTPOutput.



19
20
21
# File 'lib/fluent/plugin/out_http.rb', line 19

def initialize
  super
end

Instance Method Details

#bulk_request_format(tag, time, record) ⇒ Object



280
281
282
# File 'lib/fluent/plugin/out_http.rb', line 280

def bulk_request_format(tag, time, record)
  @formatter.format(tag, time, record)
end

#compress_body(req, data) ⇒ Object



149
150
151
152
153
154
155
156
# File 'lib/fluent/plugin/out_http.rb', line 149

def compress_body(req, data)
  return unless @compress_request
  gz = Zlib::GzipWriter.new(StringIO.new)
  gz << data

  req['Content-Encoding'] = "gzip"
  req.body = gz.close.string
end

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/plugin/out_http.rb', line 80

def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super

  @ssl_verify_mode = if @ssl_no_verify
                       OpenSSL::SSL::VERIFY_NONE
                     else
                       OpenSSL::SSL::VERIFY_PEER
                     end

  @ca_file = @cacert_file
  @last_request_time = nil
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered

  if @formatter_config = conf.elements('format').first
    @formatter = formatter_create
  end

  if @bulk_request
    class << self
      alias_method :format, :bulk_request_format
    end
    @formatter = formatter_create(type: :json)
    @serializer = :x_ndjson # secret settings for bulk_request
  else
    class << self
      alias_method :format, :split_request_format
    end
  end
end

#create_request(tag, time, record) ⇒ Object



182
183
184
185
186
187
188
189
# File 'lib/fluent/plugin/out_http.rb', line 182

def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.request_uri)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  return req, uri
end

#format(tag, time, record) ⇒ Object



272
273
274
# File 'lib/fluent/plugin/out_http.rb', line 272

def format(tag, time, record)
  # For safety.
end

#format_url(tag, time, record) ⇒ Object



119
120
121
# File 'lib/fluent/plugin/out_http.rb', line 119

def format_url(tag, time, record)
  @endpoint_url
end

#formatted_to_msgpack_binary?Boolean

Returns:

  • (Boolean)


284
285
286
287
288
289
290
# File 'lib/fluent/plugin/out_http.rb', line 284

def formatted_to_msgpack_binary?
  if @bulk_request
    false
  else
    true
  end
end

#handle_record(tag, time, record) ⇒ Object

end send_request



255
256
257
258
259
260
261
# File 'lib/fluent/plugin/out_http.rb', line 255

def handle_record(tag, time, record)
  if @formatter_config
    record = @formatter.format(tag, time, record)
  end
  req, uri = create_request(tag, time, record)
  send_request(req, uri)
end

#handle_records(tag, time, chunk) ⇒ Object



263
264
265
266
# File 'lib/fluent/plugin/out_http.rb', line 263

def handle_records(tag, time, chunk)
  req, uri = create_request(tag, time, chunk.read)
  send_request(req, uri)
end

#http_opts(uri) ⇒ Object



191
192
193
194
195
196
197
198
199
200
# File 'lib/fluent/plugin/out_http.rb', line 191

def http_opts(uri)
    opts = {
      :use_ssl => uri.scheme == 'https'
    }
    opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl]
    opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file)
    opts[:cert] = OpenSSL::X509::Certificate.new(File.read(@client_cert_path)) if File.file?(@client_cert_path)
    opts[:key] = OpenSSL::PKey.read(File.read(@private_key_path), @private_key_passphrase) if File.file?(@private_key_path)
    opts
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


292
293
294
# File 'lib/fluent/plugin/out_http.rb', line 292

def multi_workers_ready?
  true
end

#prefer_buffered_processingObject



268
269
270
# File 'lib/fluent/plugin/out_http.rb', line 268

def prefer_buffered_processing
  @buffered
end

#process(tag, es) ⇒ Object



296
297
298
299
300
# File 'lib/fluent/plugin/out_http.rb', line 296

def process(tag, es)
  es.each do |time, record|
    handle_record(tag, time, record)
  end
end

#proxiesObject



202
203
204
# File 'lib/fluent/plugin/out_http.rb', line 202

def proxies
  ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy']
end

#send_request(req, uri) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/fluent/plugin/out_http.rb', line 206

def send_request(req, uri)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    log.info('Dropped request due to rate limiting')
    return
  end

  res = nil

  begin
    if @authentication == :basic
      req.basic_auth(@username, @password)
    elsif @authentication == :bearer
      req['authorization'] = "bearer #{@token}"
    elsif @authentication == :jwt
      req['authorization'] = "jwt #{@token}"
    end
    @last_request_time = Time.now.to_f

    if proxy = proxies
      proxy_uri = URI.parse(proxy)

      res = Net::HTTP.start(uri.host, uri.port,
                            proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password,
                            **http_opts(uri)) {|http| http.request(req) }
    else
      res = Net::HTTP.start(uri.host, uri.port, **http_opts(uri)) {|http| http.request(req) }
    end

  rescue => e # rescue all StandardErrors
    # server didn't respond
    log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'"
    raise e if @raise_on_error
  else
     unless res and res.is_a?(Net::HTTPSuccess)
        res_summary = if res
                         "#{res.code} #{res.message} #{res.body}"
                      else
                         "res=nil"
                      end
        if @recoverable_status_codes.include?(res.code.to_i)
          raise RecoverableResponse, res_summary
        else
          log.warn "failed to #{req.method} #{uri} (#{res_summary})"
        end
     end #end unless
  end # end begin
end

#set_body(req, tag, time, record) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin/out_http.rb', line 123

def set_body(req, tag, time, record)
  if @serializer == :json
    set_json_body(req, record)
  elsif @serializer == :text
    set_text_body(req, record)
  elsif @serializer == :raw
    set_raw_body(req, record)
  elsif @serializer == :x_ndjson
    set_bulk_body(req, record)
  else
    req.set_form_data(record)
  end
  req
end

#set_bulk_body(req, data) ⇒ Object



176
177
178
179
180
# File 'lib/fluent/plugin/out_http.rb', line 176

def set_bulk_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/x-ndjson'
  compress_body(req, req.body)
end

#set_header(req, tag, time, record) ⇒ Object



138
139
140
141
142
143
144
145
146
147
# File 'lib/fluent/plugin/out_http.rb', line 138

def set_header(req, tag, time, record)
  if @custom_headers
    @custom_headers.each do |k,v|
      req[k] = v
    end
    req
  else
    req
  end
end

#set_json_body(req, data) ⇒ Object



158
159
160
161
162
# File 'lib/fluent/plugin/out_http.rb', line 158

def set_json_body(req, data)
  req.body = Yajl.dump(data)
  req['Content-Type'] = 'application/json'
  compress_body(req, req.body)
end

#set_raw_body(req, data) ⇒ Object



170
171
172
173
174
# File 'lib/fluent/plugin/out_http.rb', line 170

def set_raw_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/octet-stream'
  compress_body(req, req.body)
end

#set_text_body(req, data) ⇒ Object



164
165
166
167
168
# File 'lib/fluent/plugin/out_http.rb', line 164

def set_text_body(req, data)
  req.body = data["message"]
  req['Content-Type'] = 'text/plain'
  compress_body(req, req.body)
end

#shutdownObject



115
116
117
# File 'lib/fluent/plugin/out_http.rb', line 115

def shutdown
  super
end

#split_request_format(tag, time, record) ⇒ Object



276
277
278
# File 'lib/fluent/plugin/out_http.rb', line 276

def split_request_format(tag, time, record)
  [time, record].to_msgpack
end

#startObject



111
112
113
# File 'lib/fluent/plugin/out_http.rb', line 111

def start
  super
end

#write(chunk) ⇒ Object



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/fluent/plugin/out_http.rb', line 302

def write(chunk)
  tag = chunk..tag
  @endpoint_url = extract_placeholders(@endpoint_url, chunk)

  log.debug { "#{@http_method.capitalize} data to #{@endpoint_url} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" }

  if @bulk_request
    time = Fluent::Engine.now
    handle_records(tag, time, chunk)
  else
    chunk.msgpack_each do |time, record|
      handle_record(tag, time, record)
    end
  end
end