Class: LogStash::Inputs::Http
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Http
- Defined in:
- lib/logstash/inputs/http.rb,
lib/logstash/inputs/http/tls.rb,
lib/logstash/inputs/http/message_handler.rb
Overview
Using this input you can receive single or multiline events over http(s). Applications can send a HTTP POST request with a body to the endpoint started by this input and Logstash will convert it into an event for subsequent processing. Users can pass plain text, JSON, or any formatted data and use a corresponding codec with this input. For Content-Type ‘application/json` the `json` codec is used, but for all other data formats, `plain` codec is used.
This input can also be used to receive webhook requests to integrate with other services and applications. By taking advantage of the vast plugin ecosystem available in Logstash you can trigger actionable events right from your application.
Security
This plugin supports standard HTTP basic authentication headers to identify the requester. You can pass in an username, password combination while sending data to this input
You can also setup SSL and send data securely over https, with an option of validating the client’s certificate. Currently, the certificate setup is through docs.oracle.com/cd/E19509-01/820-3503/ggfen/index.html[Java Keystore format]
Defined Under Namespace
Classes: MessageHandler, TLS
Instance Method Summary collapse
- #build_ssl_params ⇒ Object
- #client_authentication? ⇒ Boolean
- #close ⇒ Object
- #create_http_server(message_handler) ⇒ Object
- #decode_body(headers, remote_address, body, default_codec, additional_codecs) ⇒ Object
- #push_decoded_event(headers, remote_address, event, add_headers = true) ⇒ Object
- #register ⇒ Object
- #require_certificate_authorities? ⇒ Boolean
-
#run(queue) ⇒ Object
def register.
- #ssl_jks_configured? ⇒ Boolean
- #ssl_key_configured? ⇒ Boolean
- #stop ⇒ Object
- #validate_ssl_settings! ⇒ Object
Instance Method Details
#build_ssl_params ⇒ Object
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/logstash/inputs/http.rb', line 240 def build_ssl_params return nil unless @ssl if @keystore && @keystore_password ssl_builder = org.logstash.plugins.inputs.http.util.JksSslBuilder.new(@keystore, @keystore_password.value) else begin ssl_builder = org.logstash.plugins.inputs.http.util.SslSimpleBuilder .new(@ssl_certificate, @ssl_key, @ssl_key_passphrase.nil? ? nil : @ssl_key_passphrase.value) .setCipherSuites(normalized_ciphers) rescue java.lang.IllegalArgumentException => e @logger.error("SSL configuration invalid", error_details(e)) raise LogStash::ConfigurationError, e end if client_authentication? ssl_builder.setCertificateAuthorities(@ssl_certificate_authorities) end end new_ssl_handshake_provider(ssl_builder) end |
#client_authentication? ⇒ Boolean
271 272 273 |
# File 'lib/logstash/inputs/http.rb', line 271 def client_authentication? @ssl_certificate_authorities && @ssl_certificate_authorities.size > 0 end |
#close ⇒ Object
162 163 164 |
# File 'lib/logstash/inputs/http.rb', line 162 def close @http_server.close() rescue nil end |
#create_http_server(message_handler) ⇒ Object
235 236 237 238 |
# File 'lib/logstash/inputs/http.rb', line 235 def create_http_server() org.logstash.plugins.inputs.http.NettyHttpServer.new( @host, @port, , build_ssl_params(), @threads, @max_pending_requests, @max_content_length, @response_code) end |
#decode_body(headers, remote_address, body, default_codec, additional_codecs) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/logstash/inputs/http.rb', line 166 def decode_body(headers, remote_address, body, default_codec, additional_codecs) content_type = headers.fetch("content_type", "") if (content_type.start_with?("application/x-protobuf")) # events = @promtail_input.decode_str(@promtail_input.toUTF8String(body)) events = @promtail_input.decode(body) events.each do |event| push_decoded_event(headers, remote_address, LogStash::Event.new(event), false) end else body_str = body body_str = @promtail_input.toUTF8String(body) if !body.is_a?(String) codec = additional_codecs.fetch(HttpUtil.getMimeType(content_type), default_codec) codec.decode(body_str) { |event| push_decoded_event(headers, remote_address, event) } codec.flush { |event| push_decoded_event(headers, remote_address, event) } end true rescue => e @logger.error( "unable to process event.", :message => e., :class => e.class.name, :backtrace => e.backtrace ) false end |
#push_decoded_event(headers, remote_address, event, add_headers = true) ⇒ Object
194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/logstash/inputs/http.rb', line 194 def push_decoded_event(headers, remote_address, event, add_headers=true) if add_headers event.set(@request_headers_target_field, headers) event.set(@remote_host_target_field, remote_address) end tenant = headers.fetch("tenant", "") tenant = headers.fetch("x_scope_orgid", "") if tenant.empty? if !tenant.empty? event.set("tenant", tenant) end decorate(event) @logger.info("Pushing request to #{remote_address} with headers #{headers} event: #{event}") @queue << event end |
#register ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/logstash/inputs/http.rb', line 129 def register validate_ssl_settings! if @user && @password then token = Base64.strict_encode64("#{@user}:#{@password.value}") @auth_token = "Basic #{token}" end @codecs = Hash.new @additional_codecs.each do |content_type, codec| @codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new end @promtail_input = org.logstash.plugins.inputs.http.promtail.PromtailHandler.new require "logstash/inputs/http/message_handler" = MessageHandler.new(self, @codec, @codecs, @auth_token) @http_server = create_http_server() end |
#require_certificate_authorities? ⇒ Boolean
275 276 277 |
# File 'lib/logstash/inputs/http.rb', line 275 def @ssl_verify_mode_final == "force_peer" || @ssl_verify_mode_final == "peer" end |
#run(queue) ⇒ Object
def register
152 153 154 155 156 |
# File 'lib/logstash/inputs/http.rb', line 152 def run(queue) @queue = queue @logger.info("Starting http input listener", :address => "#{@host}:#{@port}", :ssl => "#{@ssl}") @http_server.run() end |
#ssl_jks_configured? ⇒ Boolean
267 268 269 |
# File 'lib/logstash/inputs/http.rb', line 267 def ssl_jks_configured? !!(@keystore && @keystore_password) end |
#ssl_key_configured? ⇒ Boolean
263 264 265 |
# File 'lib/logstash/inputs/http.rb', line 263 def ssl_key_configured? !!(@ssl_certificate && @ssl_key) end |
#stop ⇒ Object
158 159 160 |
# File 'lib/logstash/inputs/http.rb', line 158 def stop @http_server.close() rescue nil end |
#validate_ssl_settings! ⇒ Object
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/logstash/inputs/http.rb', line 209 def validate_ssl_settings! if !@ssl @logger.warn("SSL Certificate will not be used") if @ssl_certificate @logger.warn("SSL Key will not be used") if @ssl_key @logger.warn("SSL Java Key Store will not be used") if @keystore elsif !(ssl_key_configured? || ssl_jks_configured?) raise LogStash::ConfigurationError, "Certificate or JKS must be configured" end if @ssl && (original_params.key?("verify_mode") && original_params.key?("ssl_verify_mode")) raise LogStash::ConfigurationError, "Both 'ssl_verify_mode' and 'verify_mode' were set. Use only 'ssl_verify_mode'." elsif original_params.key?("verify_mode") @ssl_verify_mode_final = @verify_mode elsif original_params.key?("ssl_verify_mode") @ssl_verify_mode_final = @ssl_verify_mode else @ssl_verify_mode_final = @ssl_verify_mode end if @ssl && && !client_authentication? raise LogStash::ConfigurationError, "Using `ssl_verify_mode` or `verify_mode` set to PEER or FORCE_PEER, requires the configuration of `ssl_certificate_authorities`" elsif @ssl && ! && client_authentication? raise LogStash::ConfigurationError, "The configuration of `ssl_certificate_authorities` requires setting `ssl_verify_mode` or `verify_mode` to PEER or FORCE_PEER" end end |