Class: LogStash::Inputs::Http

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/http.rb,
lib/logstash/inputs/http/tls.rb,
lib/logstash/inputs/http/message_handler.rb

Overview

Using this input you can receive single or multiline events over http(s). Applications can send a HTTP POST request with a body to the endpoint started by this input and Logstash will convert it into an event for subsequent processing. Users can pass plain text, JSON, or any formatted data and use a corresponding codec with this input. For Content-Type ‘application/json` the `json` codec is used, but for all other data formats, `plain` codec is used.

This input can also be used to receive webhook requests to integrate with other services and applications. By taking advantage of the vast plugin ecosystem available in Logstash you can trigger actionable events right from your application.

Security

This plugin supports standard HTTP basic authentication headers to identify the requester. You can pass in an username, password combination while sending data to this input

You can also setup SSL and send data securely over https, with an option of validating the client’s certificate. Currently, the certificate setup is through docs.oracle.com/cd/E19509-01/820-3503/ggfen/index.html[Java Keystore format]

Defined Under Namespace

Classes: MessageHandler, TLS

Instance Method Summary collapse

Instance Method Details

#build_ssl_paramsObject



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/logstash/inputs/http.rb', line 240

def build_ssl_params
  return nil unless @ssl

  if @keystore && @keystore_password
    ssl_builder = org.logstash.plugins.inputs.http.util.JksSslBuilder.new(@keystore, @keystore_password.value)
  else
    begin
      ssl_builder = org.logstash.plugins.inputs.http.util.SslSimpleBuilder
                        .new(@ssl_certificate, @ssl_key, @ssl_key_passphrase.nil? ? nil : @ssl_key_passphrase.value)
                        .setCipherSuites(normalized_ciphers)
    rescue java.lang.IllegalArgumentException => e
      @logger.error("SSL configuration invalid", error_details(e))
      raise LogStash::ConfigurationError, e
    end

    if client_authentication?
      ssl_builder.setCertificateAuthorities(@ssl_certificate_authorities)
    end
  end

  new_ssl_handshake_provider(ssl_builder)
end

#client_authentication?Boolean

Returns:

  • (Boolean)


271
272
273
# File 'lib/logstash/inputs/http.rb', line 271

def client_authentication?
  @ssl_certificate_authorities && @ssl_certificate_authorities.size > 0
end

#closeObject



162
163
164
# File 'lib/logstash/inputs/http.rb', line 162

def close
  @http_server.close() rescue nil
end

#create_http_server(message_handler) ⇒ Object



235
236
237
238
# File 'lib/logstash/inputs/http.rb', line 235

def create_http_server(message_handler)
  org.logstash.plugins.inputs.http.NettyHttpServer.new(
    @host, @port, message_handler, build_ssl_params(), @threads, @max_pending_requests, @max_content_length, @response_code)
end

#decode_body(headers, remote_address, body, default_codec, additional_codecs) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/logstash/inputs/http.rb', line 166

def decode_body(headers, remote_address, body, default_codec, additional_codecs)
  content_type = headers.fetch("content_type", "")

  if (content_type.start_with?("application/x-protobuf"))
    # events = @promtail_input.decode_str(@promtail_input.toUTF8String(body))
    events = @promtail_input.decode(body)
    events.each do |event|
      push_decoded_event(headers, remote_address, LogStash::Event.new(event), false)
    end
  else
    body_str = body
    body_str = @promtail_input.toUTF8String(body) if !body.is_a?(String)
    codec = additional_codecs.fetch(HttpUtil.getMimeType(content_type), default_codec)
    codec.decode(body_str) { |event| push_decoded_event(headers, remote_address, event) }
    codec.flush { |event| push_decoded_event(headers, remote_address, event) }
  end

  true
rescue => e
  @logger.error(
    "unable to process event.",
    :message => e.message,
    :class => e.class.name,
    :backtrace => e.backtrace
  )
  false
end

#push_decoded_event(headers, remote_address, event, add_headers = true) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/logstash/inputs/http.rb', line 194

def push_decoded_event(headers, remote_address, event, add_headers=true)
  if add_headers
    event.set(@request_headers_target_field, headers)
    event.set(@remote_host_target_field, remote_address)
  end
  tenant = headers.fetch("tenant", "")
  tenant = headers.fetch("x_scope_orgid", "") if tenant.empty?
  if !tenant.empty?
    event.set("tenant", tenant)
  end
  decorate(event)
  @logger.info("Pushing request to #{remote_address} with headers #{headers} event: #{event}")
  @queue << event
end

#registerObject



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/logstash/inputs/http.rb', line 129

def register

  validate_ssl_settings!

  if @user && @password then
    token = Base64.strict_encode64("#{@user}:#{@password.value}")
    @auth_token = "Basic #{token}"
  end

  @codecs = Hash.new

  @additional_codecs.each do |content_type, codec|
    @codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new
  end


  @promtail_input = org.logstash.plugins.inputs.http.promtail.PromtailHandler.new

  require "logstash/inputs/http/message_handler"
  message_handler = MessageHandler.new(self, @codec, @codecs, @auth_token)
  @http_server = create_http_server(message_handler)
end

#require_certificate_authorities?Boolean

Returns:

  • (Boolean)


275
276
277
# File 'lib/logstash/inputs/http.rb', line 275

def require_certificate_authorities?
  @ssl_verify_mode_final == "force_peer" || @ssl_verify_mode_final == "peer"
end

#run(queue) ⇒ Object

def register



152
153
154
155
156
# File 'lib/logstash/inputs/http.rb', line 152

def run(queue)
  @queue = queue
  @logger.info("Starting http input listener", :address => "#{@host}:#{@port}", :ssl => "#{@ssl}")
  @http_server.run()
end

#ssl_jks_configured?Boolean

Returns:

  • (Boolean)


267
268
269
# File 'lib/logstash/inputs/http.rb', line 267

def ssl_jks_configured?
  !!(@keystore && @keystore_password)
end

#ssl_key_configured?Boolean

Returns:

  • (Boolean)


263
264
265
# File 'lib/logstash/inputs/http.rb', line 263

def ssl_key_configured?
  !!(@ssl_certificate && @ssl_key)
end

#stopObject



158
159
160
# File 'lib/logstash/inputs/http.rb', line 158

def stop
  @http_server.close() rescue nil
end

#validate_ssl_settings!Object



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/logstash/inputs/http.rb', line 209

def validate_ssl_settings!
  if !@ssl
    @logger.warn("SSL Certificate will not be used") if @ssl_certificate
    @logger.warn("SSL Key will not be used") if @ssl_key
    @logger.warn("SSL Java Key Store will not be used") if @keystore
  elsif !(ssl_key_configured? || ssl_jks_configured?)
    raise LogStash::ConfigurationError, "Certificate or JKS must be configured"
  end

  if @ssl && (original_params.key?("verify_mode") && original_params.key?("ssl_verify_mode"))
      raise LogStash::ConfigurationError, "Both 'ssl_verify_mode' and 'verify_mode' were set. Use only 'ssl_verify_mode'."
  elsif original_params.key?("verify_mode")
    @ssl_verify_mode_final = @verify_mode
  elsif original_params.key?("ssl_verify_mode")
    @ssl_verify_mode_final = @ssl_verify_mode
  else
    @ssl_verify_mode_final = @ssl_verify_mode
  end

  if @ssl && require_certificate_authorities? && !client_authentication?
    raise LogStash::ConfigurationError, "Using `ssl_verify_mode` or `verify_mode` set to PEER or FORCE_PEER, requires the configuration of `ssl_certificate_authorities`"
  elsif @ssl && !require_certificate_authorities? && client_authentication?
    raise LogStash::ConfigurationError, "The configuration of `ssl_certificate_authorities` requires setting `ssl_verify_mode` or `verify_mode` to PEER or FORCE_PEER"
  end
end