Class: LogStash::Outputs::Bcdb

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::HttpClient
Defined in:
lib/logstash/outputs/bcdb.rb

Defined Under Namespace

Classes: RetryTimerTask

Constant Summary collapse

VALID_METHODS =
["put", "post", "patch", "delete", "get", "head"]
RETRYABLE_MANTICORE_EXCEPTIONS =
[
  ::Manticore::Timeout,
  ::Manticore::SocketException,
  ::Manticore::ClientProtocolException,
  ::Manticore::ResolutionFailure,
  ::Manticore::SocketTimeout
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#is_batchObject

Returns the value of attribute is_batch.



16
17
18
# File 'lib/logstash/outputs/bcdb.rb', line 16

def is_batch
  @is_batch
end

Instance Method Details

#bcdb_authoriseObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/logstash/outputs/bcdb.rb', line 107

def bcdb_authorise()
    auth_uri = URI.parse(@auth_url)
    auth_data = {
        :username => @username,
        :password => @password,
        :client_id => @client_id,
        :client_secret => @client_secret,
        :grant_type => @grant_type
    }
    status = true
    begin
        unless (@token_oauth && (@expires_token && Time.now.utc < @expires_token))
            https= Net::HTTP.new(auth_uri.host,auth_uri.port)
            https.use_ssl = auth_uri.scheme == 'https'

            request = Net::HTTP::Post.new(auth_uri.path)
            request.set_form_data(auth_data)
            request['Content-Type'] = "application/x-www-form-urlencoded"
            resp = https.request(request)
            bcdb_response = {}
            bcdb_response = JSON.parse(resp.body) rescue bcdb_response["code"] = 5000.to_s
            if resp.code == 200.to_s && bcdb_response['access_token']
                @token_oauth = bcdb_response['access_token']
                @headers["Authorization"] = "Bearer #{@token_oauth}"
                @expires_token = Time.now.utc + bcdb_response['expires_in'].to_i
            else
                status = false
                @logger.error("Authentification failed please check your credentials")
            end
        end
    rescue => e
      # This should never happen unless there's a flat out bug in the code
      @logger.error("Error Makeing Authorization Request to BCDB",
        :class => e.class.name,
        :message => e.message,
        :backtrace => e.backtrace)
      sleep(2)
      bcdb_authorise()
    end
    return status
end

#bcdb_update_schema(data, cached_keys = false) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/logstash/outputs/bcdb.rb', line 149

def bcdb_update_schema(data, cached_keys=false)
    bcdb_authorise()
    schema_uri = URI.parse(@create_schema_url)
    schema_properties = {}
    data.each do |key|
        schema_properties["#{key}"] = {
            :"$id" => "/properties/#{schema_properties["#{key}"]}",
            :type => ["string", "object", "array"],
            :title => "The #{schema_properties["#{key}"]} Schema"
        }
    end
    schema_data = {
        :type => "object",
        :"$id" => "http://example.com/"+@bcdb_entity+".json",
        :"$schema" => "http://json-schema.org/draft-07/schema#",
        :title => "The Root Schema",
        :properties => schema_properties,
        :autoId => true
    }
    body = JSON(schema_data)

    if cached_keys
        request = bcdb_url(schema_uri,'put', body)
    else
        request = bcdb_url(schema_uri,'post',body)
        resp = JSON.parse(request.body)["code"] rescue  @logger.error("[BCDB SCHEMA] Unexpected error")
        if request.code == 403
            @logger.error("Authentification failed please check your credentials")
        elsif resp == 4009 || resp ==4000
            request = bcdb_url(schema_uri,'put', body)
        end
    end
   return data, true
end

#bcdb_url(uri, type, body) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/logstash/outputs/bcdb.rb', line 183

def bcdb_url(uri,type,body)
    bcdb_request = Net::HTTP.new(uri.host,uri.port)
    bcdb_request.use_ssl = uri.scheme == 'https'
    case type
    when 'post'
        request = Net::HTTP::Post.new(uri.path)
    when 'put'
        request = Net::HTTP::Put.new(uri.path)
    end
    request.body = body
    request['Content-Type'] = "application/json"
    request['authorization'] = "Bearer #{@token_oauth}"
    response = bcdb_request.request(request)
    return response
end

#closeObject



389
390
391
392
# File 'lib/logstash/outputs/bcdb.rb', line 389

def close
  @timer.cancel
  client.close
end

#log_error_response(response, url, event) ⇒ Object



263
264
265
266
267
268
269
270
# File 'lib/logstash/outputs/bcdb.rb', line 263

def log_error_response(response, url, event)
  log_failure(
            "Encountered non-2xx HTTP code #{response.code}",
            :response_code => response.code,
            :url => url,
            :event => event
          )
end

#log_retryable_response(response) ⇒ Object



255
256
257
258
259
260
261
# File 'lib/logstash/outputs/bcdb.rb', line 255

def log_retryable_response(response)
  if (response.code == 429)
    @logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP")
  else
    @logger.warn("Encountered a retryable HTTP request in HTTP output, will retry", :code => response.code, :body => response.body)
  end
end

#multi_receive(events) ⇒ Object

def register



237
238
239
240
# File 'lib/logstash/outputs/bcdb.rb', line 237

def multi_receive(events)
  return if events.empty?
  send_events(events)
end

#registerObject



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/logstash/outputs/bcdb.rb', line 199

def register
  @http_method = @http_method.to_sym

  # We count outstanding requests with this queue
  # This queue tracks the requests to create backpressure
  # When this queue is empty no new requests may be sent,
  # tokens must be added back by the client on success
  @request_tokens = SizedQueue.new(@pool_max)
  @pool_max.times {|t| @request_tokens << true }

  @requests = Array.new

  if @content_type.nil?
    case @format
      when "form" ; @content_type = "application/x-www-form-urlencoded"
      when "json" ; @content_type = "application/json"
      when "json_batch" ; @content_type = "application/json"
      when "message" ; @content_type = "text/plain"
    end
  end

  @is_batch = @format == "json_batch"

  @headers["Content-Type"] = @content_type

  validate_format!
  bcdb_authorise()
  @create_schema_url = "#{@base_url}" + "/data/catalog/_JsonSchema/" + "#{@bcdb_entity}"
  if  @format == "json_batch"
      @url = "#{@base_url}" + "/data/bulkAsync/" + "#{@bcdb_entity}"
  else
      @url = "#{@base_url}" + "/data/" + "#{@bcdb_entity}"
  end

  # Run named Timer as daemon thread
  @timer = java.util.Timer.new("HTTP Output #{self.params['id']}", true)
end

#send_event(event, attempt) ⇒ Object



340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/logstash/outputs/bcdb.rb', line 340

def send_event(event, attempt)
  bcdb_authorise()

  body = event_body(event)
  # Send the request
  url = @is_batch ? @url : event.sprintf(@url)
  headers = @is_batch ? @headers : event_headers(event)

  # Compress the body and add appropriate header
  if @http_compression == true
    headers["Content-Encoding"] = "gzip"
    body = gzip(body)
  end

  # Create an async request
  response = client.send(@http_method, url, :body => body, :headers => headers).call
  @logger.debug("[MAKEING REQUEST] Url: #{url}, response  #{response.inspect}")
  if !response_success?(response)
    if retryable_response?(response)
      log_retryable_response(response)
      return :retry, event, attempt
    else
      log_error_response(response, url, event)
      return :failure, event, attempt
    end
  else
    return :success, event, attempt
  end

rescue => exception
  will_retry = retryable_exception?(exception)
  log_failure("Could not fetch URL",
              :url => url,
              :method => @http_method,
              :body => body,
              :headers => headers,
              :message => exception.message,
              :class => exception.class.name,
              :backtrace => exception.backtrace,
              :will_retry => will_retry
  )

  if will_retry
    return :retry, event, attempt
  else
    return :failure, event, attempt
  end
end

#send_events(events) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/logstash/outputs/bcdb.rb', line 272

def send_events(events)
  successes = java.util.concurrent.atomic.AtomicInteger.new(0)
  failures  = java.util.concurrent.atomic.AtomicInteger.new(0)
  retries = java.util.concurrent.atomic.AtomicInteger.new(0)
  event_count = @is_batch ? 1 : events.size

  pending = Queue.new
  if @is_batch
    pending << [events, 0]
  else
    events.each {|e| pending << [e, 0]}
  end

  while popped = pending.pop
    break if popped == :done

    event, attempt = popped

    action, event, attempt = send_event(event, attempt)
    begin
      action = :failure if action == :retry && !@retry_failed

      case action
      when :success
        successes.incrementAndGet
      when :retry
        retries.incrementAndGet

        next_attempt = attempt+1
        sleep_for = sleep_for_attempt(next_attempt)
        @logger.info("Retrying http request, will sleep for #{sleep_for} seconds")
        timer_task = RetryTimerTask.new(pending, event, next_attempt)
        @timer.schedule(timer_task, sleep_for*1000)
      when :failure
        failures.incrementAndGet
      else
        raise "Unknown action #{action}"
      end

      if action == :success || action == :failure
        if successes.get+failures.get == event_count
          pending << :done
        end
      end
    rescue => e
      # This should never happen unless there's a flat out bug in the code
      @logger.error("Error sending HTTP Request",
        :class => e.class.name,
        :message => e.message,
        :backtrace => e.backtrace)
      failures.incrementAndGet
      raise e
    end
  end
rescue => e
  @logger.error("Error in http output loop",
          :class => e.class.name,
          :message => e.message,
          :backtrace => e.backtrace)
  raise e
end

#sleep_for_attempt(attempt) ⇒ Object



334
335
336
337
338
# File 'lib/logstash/outputs/bcdb.rb', line 334

def sleep_for_attempt(attempt)
  sleep_for = attempt**2
  sleep_for = sleep_for <= 60 ? sleep_for : 60
  (sleep_for/2) + (rand(0..sleep_for)/2)
end