Class: Fluent::HttpInput

Inherits:
Input
  • Object
show all
Includes:
DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/in_http.rb

Defined Under Namespace

Classes: Handler, KeepaliveManager

Constant Summary collapse

EMPTY_GIF_IMAGE =
"GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8")

Instance Method Summary collapse

Methods included from DetachProcessImpl

#on_detach_process, #on_exit_process

Constructor Details

#initializeHttpInput

Returns a new instance of HttpInput.



35
36
37
38
# File 'lib/fluent/plugin/in_http.rb', line 35

def initialize
  require 'webrick/httputils'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/in_http.rb', line 63

def configure(conf)
  super

  m = if @format == 'default'
        method(:parse_params_default)
      else
        @parser = Plugin.new_parser(@format)
        @parser.configure(conf)
        method(:parse_params_with_parser)
      end
  (class << self; self; end).module_eval do
    define_method(:parse_params, m)
  end
end

#on_request(path_info, params) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/fluent/plugin/in_http.rb', line 144

def on_request(path_info, params)
  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')
    record_time, record = parse_params(params)

    # Skip nil record
    if record.nil?
      if @respond_with_empty_img
        return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
      else
        return ["200 OK", {'Content-Type'=>'text/plain'}, ""]
      end
    end

    unless record.is_a?(Array)
      if @add_http_headers
        params.each_pair { |k,v|
          if k.start_with?("HTTP_")
            record[k] = v
          end
        }
      end
      if @add_remote_addr
        record['REMOTE_ADDR'] = params['REMOTE_ADDR']
      end
    end
    time = if param_time = params['time']
             param_time = param_time.to_f
             param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time))
           else
             record_time.nil? ? Engine.now : record_time
           end
  rescue
    return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
  end

  # TODO server error
  begin
    # Support batched requests
    if record.is_a?(Array)
      mes = MultiEventStream.new
      record.each do |single_record|
        if @add_http_headers
          params.each_pair { |k,v|
            if k.start_with?("HTTP_")
              single_record[k] = v
            end
          }
        end
        if @add_remote_addr
          single_record['REMOTE_ADDR'] = params['REMOTE_ADDR']
        end
        single_time = single_record.delete("time") || time
        mes.add(single_time, single_record)
      end
      router.emit_stream(tag, mes)
    else
      router.emit(tag, time, record)
    end
  rescue
    return ["500 Internal Server Error", {'Content-Type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
  end

  if @respond_with_empty_img
    return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
  else
    return ["200 OK", {'Content-Type'=>'text/plain'}, ""]
  end
end

#runObject



137
138
139
140
141
142
# File 'lib/fluent/plugin/in_http.rb', line 137

def run
  @loop.run(@blocking_timeout)
rescue
  log.error "unexpected error", error: $!.to_s
  log.error_backtrace
end

#shutdownObject



128
129
130
131
132
133
134
135
# File 'lib/fluent/plugin/in_http.rb', line 128

def shutdown
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @lsock.close
  @thread.join

  super
end

#startObject



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/fluent/plugin/in_http.rb', line 102

def start
  log.debug "listening http on #{@bind}:#{@port}"

  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  client = ServerEngine::SocketManager::Client.new(socket_manager_path)
  lsock = client.listen_tcp(@bind, @port)

  detach_multi_process do
    super
    @km = KeepaliveManager.new(@keepalive_timeout)
    @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
                                   @body_size_limit, @format, log,
                                   @cors_allow_origins)
    @lsock.listen(@backlog) unless @backlog.nil?

    @loop = Coolio::Loop.new
    @loop.attach(@km)
    @loop.attach(@lsock)

    @thread = Thread.new(&method(:run))
  end
end