Class: BcdbOut

Inherits:
Fluent::Plugin::Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_bcdb.rb

Defined Under Namespace

Classes: RecoverableResponse

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
DEFAULT_FORMATTER =
"json"

Instance Method Summary collapse

Constructor Details

#initializeBcdbOut

Returns a new instance of BcdbOut.



35
36
37
# File 'lib/fluent/plugin/out_bcdb.rb', line 35

def initialize
  super
end

Instance Method Details

#bcdb_authoriseObject



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/fluent/plugin/out_bcdb.rb', line 144

def bcdb_authorise()
    auth_uri = URI.parse(@auth_url)
    auth_data = {
        :username => @username,
        :password => @password,
        :client_id => @client_id,
        :client_secret => @client_secret,
        :grant_type => @grant_type
    }
    status = true
    begin
        unless (@token_oauth && (@expires_token && Time.now.utc > @expires_token))
            https= Net::HTTP.new(auth_uri.host,auth_uri.port)
            https.use_ssl = auth_uri.scheme == 'https'

            request = Net::HTTP::Post.new(auth_uri.path)
            request.set_form_data(auth_data)
            request['Content-Type'] = "application/x-www-form-urlencoded"
            resp = https.request(request)
            log.debug("#{resp.body}")
            bcdb_response = JSON.parse(resp.body)
            if bcdb_response["code"] == 5000
                status = false
                log.error("Authentification failed please check your credentials")
            else
                @token_oauth = bcdb_response['access_token']
                @expires_token = Time.now.utc + bcdb_response['expires_in'].to_i
            end
        end
    rescue => e
      # This should never happen unless there's a flat out issue with the network

      log.error("Error Makeing Authorization Request to BCDB. Error: #{e.message} | Backtrace: #{e.backtrace}")
      sleep(2)
      bcdb_authorise()
    end
    return status
end

#bcdb_update_schema(data, cached_keys = false) ⇒ Object



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/fluent/plugin/out_bcdb.rb', line 182

def bcdb_update_schema(data, cached_keys=false)
    schema_uri = URI.parse(@create_schema_url)
    schema_properties = {}
    data.each do |key|
        log.debug("KEY #{key.inspect}")
        schema_properties["#{key}"] = {
            :"$id" => "/properties/#{schema_properties["#{key}"]}",
            :type => "string",
            :title => "The #{schema_properties["#{key}"]} Schema"
        }
    end
    schema_data = {
        :type => "object",
        :"$id" => @bcdb_entity,
        :"$schema" => "http://json-schema.org/draft-07/schema#",
        :title => "The Root Schema",
        :properties => schema_properties,
        :autoId => true
    }
    body = JSON(schema_data)

    if cached_keys
        request = bcdb_url(schema_uri,'put', body)
    else
        request = bcdb_url(schema_uri,'post',body)
        res = JSON.parse(request.body)["code"]
        if res == 4009 || res == 4000
            request = bcdb_url(schema_uri,'put', body)
        end
    end
   log.debug("UPDATE SCHEMA: #{body}")

   log.debug("UPDATE SCHEMA RESPONSE: #{request.body}")
   return data, true
end

#bcdb_url(uri, type, body) ⇒ Object



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/fluent/plugin/out_bcdb.rb', line 217

def bcdb_url(uri,type,body)
    bcdb_request = Net::HTTP.new(uri.host,uri.port)
    bcdb_request.use_ssl = uri.scheme == 'https'
    case type
    when 'post'
        request = Net::HTTP::Post.new(uri.path)
    when 'put'
        request = Net::HTTP::Put.new(uri.path)
    end
    request.body = body
    request['Content-Type'] = "application/json"
    request['authorization'] = "Bearer #{@token_oauth}"
    response = bcdb_request.request(request)
    return response
end

#bulk_request_format(tag, time, record) ⇒ Object



432
433
434
# File 'lib/fluent/plugin/out_bcdb.rb', line 432

def bulk_request_format(tag, time, record)
  @formatter.format(tag, time, record)
end

#compress_body(req, data) ⇒ Object



271
272
273
274
275
276
277
278
# File 'lib/fluent/plugin/out_bcdb.rb', line 271

def compress_body(req, data)
  return unless @compress_request
  gz = Zlib::GzipWriter.new(StringIO.new)
  gz << data

  req['Content-Encoding'] = "gzip"
  req.body = gz.close.string
end

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/fluent/plugin/out_bcdb.rb', line 105

def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super
  @create_schema_url = "#{@base_url}" + "/data/catalog/_JsonSchema/" + "#{@bcdb_entity}"
  if @bulk_request
      @base_url =  "#{@base_url}" + "/data/bulkAsync/" + "#{@bcdb_entity}"
  else
      @base_url = "#{@base_url}" + "/data/" + "#{@bcdb_entity}"
  end

  bcdb_authorise() if @authentication == :oauth

  @ssl_verify_mode = if @ssl_no_verify
                       OpenSSL::SSL::VERIFY_NONE
                     else
                       OpenSSL::SSL::VERIFY_PEER
                     end

  @ca_file = @cacert_file
  @last_request_time = nil
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered

  if @formatter_config = conf.elements('format').first
    @formatter = formatter_create
  end

  if @bulk_request
    class << self
      alias_method :format, :bulk_request_format
    end
    @formatter = formatter_create(type: :json)
    @serializer = :x_ndjson # secret settings for bulk_request

  else
    class << self
      alias_method :format, :split_request_format
    end
  end
end

#create_request(tag, time, record) ⇒ Object



330
331
332
333
334
335
336
337
338
# File 'lib/fluent/plugin/out_bcdb.rb', line 330

def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.request_uri)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  log.trace("CREATE REQUEST: #{req}, #{uri}")
  return req, uri
end

#format(tag, time, record) ⇒ Object



424
425
426
# File 'lib/fluent/plugin/out_bcdb.rb', line 424

def format(tag, time, record)
  # For safety.

end

#format_url(tag, time, record) ⇒ Object



241
242
243
# File 'lib/fluent/plugin/out_bcdb.rb', line 241

def format_url(tag, time, record)
  @base_url
end

#formatted_to_msgpack_binary?Boolean

Returns:

  • (Boolean)


436
437
438
439
440
441
442
# File 'lib/fluent/plugin/out_bcdb.rb', line 436

def formatted_to_msgpack_binary?
  if @bulk_request
    false
  else
    true
  end
end

#handle_record(tag, time, record) ⇒ Object

end send_request



407
408
409
410
411
412
413
# File 'lib/fluent/plugin/out_bcdb.rb', line 407

def handle_record(tag, time, record)
  if @formatter_config
    record = @formatter.format(tag, time, record)
  end
  req, uri = create_request(tag, time, record)
  send_request(req, uri)
end

#handle_records(tag, time, chunk) ⇒ Object



415
416
417
418
# File 'lib/fluent/plugin/out_bcdb.rb', line 415

def handle_records(tag, time, chunk)
  req, uri = create_request(tag, time, chunk.read)
  send_request(req, uri)
end

#http_opts(uri) ⇒ Object



340
341
342
343
344
345
346
347
348
349
# File 'lib/fluent/plugin/out_bcdb.rb', line 340

def http_opts(uri)
    opts = {
      :use_ssl => uri.scheme == 'https'
    }
    opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl]
    opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file)
    opts[:cert] = OpenSSL::X509::Certificate.new(File.read(@client_cert_path)) if File.file?(@client_cert_path)
    opts[:key] = OpenSSL::PKey.read(File.read(@private_key_path), @private_key_passphrase) if File.file?(@private_key_path)
    opts
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


444
445
446
# File 'lib/fluent/plugin/out_bcdb.rb', line 444

def multi_workers_ready?
  true
end

#prefer_buffered_processingObject



420
421
422
# File 'lib/fluent/plugin/out_bcdb.rb', line 420

def prefer_buffered_processing
  @buffered
end

#process(tag, es) ⇒ Object



448
449
450
451
452
453
# File 'lib/fluent/plugin/out_bcdb.rb', line 448

def process(tag, es)
  log.trace("TRACE PROCESS: #{tag}, #{es}")
  es.each do |time, record|
    handle_record(tag, time, record)
  end
end

#proxiesObject



351
352
353
# File 'lib/fluent/plugin/out_bcdb.rb', line 351

def proxies
  ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy']
end

#send_request(req, uri) ⇒ Object



355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
# File 'lib/fluent/plugin/out_bcdb.rb', line 355

def send_request(req, uri)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    log.info('Dropped request due to rate limiting')
    return
  end

  res = nil

  begin
    if @authentication == :basic
      req.basic_auth(@username, @password)
    elsif @authentication == :bearer
      req['authorization'] = "bearer #{@token}"
    elsif @authentication == :jwt
      req['authorization'] = "jwt #{@token}"
    elsif @authentication == :oauth
        req['authorization'] = "Bearer #{@token_oauth}"
    end
    @last_request_time = Time.now.to_f

    if proxy = proxies
      proxy_uri = URI.parse(proxy)

      res = Net::HTTP.start(uri.host, uri.port,
                            proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password,
                            **http_opts(uri)) {|http| http.request(req) }
    else
      res = Net::HTTP.start(uri.host, uri.port, **http_opts(uri)) {|http| http.request(req) }
      log.debug("REQUEST BODY: #{req.body}")
      log.debug("RESPONSE BODY: #{res.body}")
    end
  rescue => e # rescue all StandardErrors

    # server didn't respond

    log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'"
    raise e if @raise_on_error
  else
     unless res and res.is_a?(Net::HTTPSuccess)
        res_summary = if res
                         "#{res.code} #{res.message} #{res.body}"
                      else
                         "res=nil"
                      end
        if @recoverable_status_codes.include?(res.code.to_i)
          raise RecoverableResponse, res_summary
        else
          log.warn "failed to #{req.method} #{uri} (#{res_summary})"
        end
     end #end unless

  end # end begin

end

#set_body(req, tag, time, record) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/fluent/plugin/out_bcdb.rb', line 245

def set_body(req, tag, time, record)
  if @serializer == :json
    set_json_body(req, record)
  elsif @serializer == :text
    set_text_body(req, record)
  elsif @serializer == :raw
    set_raw_body(req, record)
  elsif @serializer == :x_ndjson
    set_bulk_body(req, record)
  else
    req.set_form_data(record)
  end
  req
end

#set_bulk_body(req, data) ⇒ Object



303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# File 'lib/fluent/plugin/out_bcdb.rb', line 303

def set_bulk_body(req, data)
  bcdb_authorise()
  if data.is_a? String
      flat_keys = []
      bcdb_data = data.split("\n").map{ |x| JSON.parse(x) }
      bcdb_data.each do |data|
          flat_keys = flat_keys + data.keys
      end
      flat_keys.uniq!
      unless @cached_keys && @keys.sort == flat_keys.sort
          @keys, @cached_keys = bcdb_update_schema(flat_keys, @cached_keys)
      end
      data = { :records => bcdb_data }
      @base_url = "#{@base_url_}" + "/data/bulkAsync/" + "#{@bcdb_entity}"
  else
      log.debug("DATA: #{data.inspect}")
      unless @cached_keys && @keys.sort == data.keys.sort
          @keys, @cached_keys = bcdb_update_schema(data.keys, @cached_keys)
      end
      data = { :records => [data] }
  end
  req.body = Yajl.dump(data)
  # req['Content-Type'] = 'application/x-ndjson'

  req['Content-Type'] = 'application/json'
  compress_body(req, req.body)
end

#set_header(req, tag, time, record) ⇒ Object



260
261
262
263
264
265
266
267
268
269
# File 'lib/fluent/plugin/out_bcdb.rb', line 260

def set_header(req, tag, time, record)
  if @custom_headers
    @custom_headers.each do |k,v|
      req[k] = v
    end
    req
  else
    req
  end
end

#set_json_body(req, data) ⇒ Object



280
281
282
283
284
285
286
287
288
289
# File 'lib/fluent/plugin/out_bcdb.rb', line 280

def set_json_body(req, data)
  bcdb_authorise()
  unless @cached_keys && @keys.sort == data.keys.sort
      @keys, @cached_keys = bcdb_update_schema(data.keys, @cached_keys)
  end
  # data = { :records => [data] } if @bulk_request

  req.body = Yajl.dump(data)
  req['Content-Type'] = "application/json"
  compress_body(req, req.body)
end

#set_raw_body(req, data) ⇒ Object



297
298
299
300
301
# File 'lib/fluent/plugin/out_bcdb.rb', line 297

def set_raw_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/octet-stream'
  compress_body(req, req.body)
end

#set_text_body(req, data) ⇒ Object



291
292
293
294
295
# File 'lib/fluent/plugin/out_bcdb.rb', line 291

def set_text_body(req, data)
  req.body = data["message"]
  req['Content-Type'] = 'text/plain'
  compress_body(req, req.body)
end

#shutdownObject



237
238
239
# File 'lib/fluent/plugin/out_bcdb.rb', line 237

def shutdown
  super
end

#split_request_format(tag, time, record) ⇒ Object



428
429
430
# File 'lib/fluent/plugin/out_bcdb.rb', line 428

def split_request_format(tag, time, record)
  [time, record].to_msgpack
end

#startObject



233
234
235
# File 'lib/fluent/plugin/out_bcdb.rb', line 233

def start
  super
end

#write(chunk) ⇒ Object



455
456
457
458
459
460
461
462
463
464
465
466
# File 'lib/fluent/plugin/out_bcdb.rb', line 455

def write(chunk)
  tag = chunk..tag
  @base_url = extract_placeholders(@base_url, chunk)
  if @bulk_request
    time = Fluent::Engine.now
    handle_records(tag, time, chunk)
  else
    chunk.msgpack_each do |time, record|
      handle_record(tag, time, record)
    end
  end
end