Class: Fluent::Plugin::HttpInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_http.rb

Defined Under Namespace

Classes: Handler, KeepaliveManager

Constant Summary collapse

EMPTY_GIF_IMAGE =
"GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8")
EVENT_RECORD_PARAMETER =
'_event_record'

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #initialize, #terminate

Methods included from Fluent::PluginId

#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir

Methods inherited from Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #initialize, #inspect, #plugin_root_dir, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Instance Method Details

#closeObject



147
148
149
150
# File 'lib/fluent/plugin/in_http.rb', line 147

def close
  @lsock.close
  super
end

#configure(conf) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/in_http.rb', line 70

def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  m = if @parser_configs.first['@type'] == 'in_http'
        @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack')
        @parser_msgpack.estimate_current_event = false
        @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json')
        @parser_json.estimate_current_event = false
        @format_name = 'default'
        method(:parse_params_default)
      else
        @parser = parser_create
        @format_name = @parser_configs.first['@type']
        method(:parse_params_with_parser)
      end
  self.singleton_class.module_eval do
    define_method(:parse_params, m)
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/fluent/plugin/in_http.rb', line 116

def multi_workers_ready?
  true
end

#on_request(path_info, params) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/fluent/plugin/in_http.rb', line 152

def on_request(path_info, params)
  begin
    path = path_info[1..-1]  # remove /
    tag = path.split('/').join('.')
    record_time, record = parse_params(params)

    # Skip nil record
    if record.nil?
      if @respond_with_empty_img
        return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
      else
        return ["200 OK", {'Content-Type'=>'text/plain'}, ""]
      end
    end

    unless record.is_a?(Array)
      if @add_http_headers
        params.each_pair { |k,v|
          if k.start_with?("HTTP_")
            record[k] = v
          end
        }
      end
      if @add_remote_addr
        record['REMOTE_ADDR'] = params['REMOTE_ADDR']
      end
    end
    time = if param_time = params['time']
             param_time = param_time.to_f
             param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time)
           else
             record_time.nil? ? Fluent::Engine.now : record_time
           end
  rescue
    return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
  end

  # TODO server error
  begin
    # Support batched requests
    if record.is_a?(Array)
      mes = Fluent::MultiEventStream.new
      record.each do |single_record|
        if @add_http_headers
          params.each_pair { |k,v|
            if k.start_with?("HTTP_")
              single_record[k] = v
            end
          }
        end
        if @add_remote_addr
          single_record['REMOTE_ADDR'] = params['REMOTE_ADDR']
        end
        single_time = single_record.delete("time") || time
        mes.add(single_time, single_record)
      end
      router.emit_stream(tag, mes)
    else
      router.emit(tag, time, record)
    end
  rescue
    return ["500 Internal Server Error", {'Content-Type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
  end

  if @respond_with_empty_img
    return ["200 OK", {'Content-Type'=>'image/gif; charset=utf-8'}, EMPTY_GIF_IMAGE]
  else
    return ["200 OK", {'Content-Type'=>'text/plain'}, ""]
  end
end

#startObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/fluent/plugin/in_http.rb', line 120

def start
  @_event_loop_run_timeout = @blocking_timeout

  super

  log.debug "listening http", bind: @bind, port: @port

  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  client = ServerEngine::SocketManager::Client.new(socket_manager_path)
  lsock = client.listen_tcp(@bind, @port)

  @km = KeepaliveManager.new(@keepalive_timeout)
  @lsock = Coolio::TCPServer.new(
    lsock, nil, Handler, @km, method(:on_request),
    @body_size_limit, @format_name, log,
    @cors_allow_origins
  )
  @lsock.listen(@backlog) unless @backlog.nil?
  event_loop_attach(@km)
  event_loop_attach(@lsock)

  @float_time_parser = Fluent::NumericTimeParser.new(:float)
end