Class: Fluent::LogzioOutputBuffered

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_logzio_buffered.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



13
14
15
16
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 13

def configure(conf)
  super
  $log.debug "Logzio url #{@endpoint_url}"
end

#format(tag, time, record) ⇒ Object



31
32
33
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 31

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



27
28
29
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 27

def shutdown
  super
end

#startObject



18
19
20
21
22
23
24
25
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 18

def start
  super
  require 'net/http/persistent'
  @uri = URI @endpoint_url
  @http = Net::HTTP::Persistent.new 'fluent-plugin-logzio', :ENV
  @http.headers['Content-Type'] = 'text/plain'
  $log.debug "Started logzio shipper.."
end

#write(chunk) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 35

def write(chunk)
  records = []

  chunk.msgpack_each {|tag,time,record|
    record['@timestamp'] ||= Time.at(time).iso8601(3) if @output_include_time
    record['fluentd_tags'] ||= tag.to_s if @output_include_tags
    records.push(record.to_json)
  }

  $log.debug "Got flush timeout, containing #{records.length} chunks"

  # Setting our request
  post = Net::HTTP::Post.new @uri.request_uri

  # Logz.io bulk http endpoint expecting log line with \n delimiter
  post.body = records.join("\n")

  begin
    response = @http.request @uri, post
    $log.debug "HTTP Response code #{response.code}"

    if response.code != '200'

      $log.debug "Got HTTP #{response.code} from logz.io, not giving up just yet"

      # If any other non-200, we will try to resend it after 2, 4 and 8 seconds. Then we will give up

      sleep_interval = 2
      @retry_count.times do |counter|

        $log.debug "Sleeping for #{sleep_interval} seconds, and trying again."

        sleep(sleep_interval)

        # Retry
        response = @http.request @uri, post

        # Sucecss, no further action is needed
        if response.code == 200

          $log.debug "Successfuly sent the failed bulk."

          # Breaking out
          break

        else

          # Doubling the sleep interval
          sleep_interval *= 2

          if counter == @retry_count - 1

            $log.error "Could not send your bulk after 3 tries. Sorry. Got HTTP #{response.code}"
          end
        end
      end
    end
  rescue StandardError
    $log.error "Error connecting to logzio. verify the url #{@endpoint_url}"
  end
end