Class: Fluent::HTTPOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_http_ext.rb

Instance Method Summary collapse

Constructor Details

#initializeHTTPOutput

Returns a new instance of HTTPOutput.



25
26
27
28
29
30
# File 'lib/fluent/plugin/out_http_ext.rb', line 25

def initialize
  super
  require 'net/http'
  require 'uri'
  require 'yajl'
end

Instance Method Details

#configure(conf) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/out_http_ext.rb', line 57

def configure(conf)
  super

  serializers = [:json, :form]
  @serializer = if serializers.include? @serializer.intern
                  @serializer.intern
                else
                  :form
                end

  http_methods = [:get, :put, :post, :delete]
  @http_method = if http_methods.include? @http_method.intern
                  @http_method.intern
                else
                  :post
                end

  @auth = case @authentication
          when 'basic' then :basic
          else
            :none
          end
  @headers = {}
  conf.elements.each do |element|
    if element.name == 'headers'
      @headers = element.to_hash
    end
  end
end

#create_request(tag, time, record) ⇒ Object



130
131
132
133
134
135
136
137
# File 'lib/fluent/plugin/out_http_ext.rb', line 130

def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.path)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  return req, uri
end

#emit(tag, es, chain) ⇒ Object



180
181
182
183
184
185
# File 'lib/fluent/plugin/out_http_ext.rb', line 180

def emit(tag, es, chain)
  es.each do |time, record|
    handle_record(tag, time, record)
  end
  chain.next
end

#format_url(tag, time, record) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/out_http_ext.rb', line 95

def format_url(tag, time, record)
  '''
  replace format string to value
  example
    /test/<data> =(use {data: 1})> /test/1
    /test/<hash.data> =(use {hash:{data:2}})> /test/2
  '''
  result_url = @endpoint_url
  record.each_deep do |key_dir, value|
    result_url = result_url.gsub(/<#{key_dir.join(".")}>/, value.to_s)
  end
  return result_url
end

#handle_record(tag, time, record) ⇒ Object

end send_request



175
176
177
178
# File 'lib/fluent/plugin/out_http_ext.rb', line 175

def handle_record(tag, time, record)
  req, uri = create_request(tag, time, record)
  send_request(req, uri)
end

#send_request(req, uri) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/fluent/plugin/out_http_ext.rb', line 139

def send_request(req, uri)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    $log.info('Dropped request due to rate limiting')
    return
  end

  res = nil

  begin
    if @auth and @auth == :basic
      req.basic_auth(@username, @password)
    end
    @last_request_time = Time.now.to_f
    client = Net::HTTP.new(uri.host, uri.port)
    if @use_ssl
      client.use_ssl = true
      client.ca_file = OpenSSL::X509::DEFAULT_CERT_FILE
    end
    res = client.start {|http| http.request(req) }
  rescue => e # rescue all StandardErrors
    # server didn't respond
    $log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'"
    raise e if @raise_on_error
  else
     unless res and res.is_a?(Net::HTTPSuccess)
        res_summary = if res
                         "#{res.code} #{res.message} #{res.body}"
                      else
                         "res=nil"
                      end
        $log.warn "failed to #{req.method} #{uri} (#{res_summary})"
     end #end unless
  end # end begin
end

#set_body(req, tag, time, record) ⇒ Object



109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/out_http_ext.rb', line 109

def set_body(req, tag, time, record)
  if @serializer == :json
    set_json_body(req, record)
  else
    req.set_form_data(record)
  end
  req
end

#set_header(req, tag, time, record) ⇒ Object



118
119
120
121
122
123
# File 'lib/fluent/plugin/out_http_ext.rb', line 118

def set_header(req, tag, time, record)
  @headers.each do |key, value|
    req[key] = value
  end
  req
end

#set_json_body(req, data) ⇒ Object



125
126
127
128
# File 'lib/fluent/plugin/out_http_ext.rb', line 125

def set_json_body(req, data)
  req.body = Yajl.dump(data)
  req['Content-Type'] = 'application/json'
end

#shutdownObject



91
92
93
# File 'lib/fluent/plugin/out_http_ext.rb', line 91

def shutdown
  super
end

#startObject



87
88
89
# File 'lib/fluent/plugin/out_http_ext.rb', line 87

def start
  super
end