Class: Fluent::Plugin::HttpInput::Handler
- Inherits:
-
Object
- Object
- Fluent::Plugin::HttpInput::Handler
- Defined in:
- lib/fluent/plugin/in_http.rb
Constant Summary collapse
- RES_200_STATUS =
"200 OK".freeze
- RES_403_STATUS =
"403 Forbidden".freeze
Instance Attribute Summary collapse
-
#content_type ⇒ Object
readonly
Returns the value of attribute content_type.
Instance Method Summary collapse
- #close ⇒ Object
- #closing? ⇒ Boolean
-
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose.
-
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
- #include_cors_allow_origin ⇒ Object
-
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) ⇒ Handler
constructor
A new instance of Handler.
- #on_body(chunk) ⇒ Object
- #on_close ⇒ Object
- #on_headers_complete(headers) ⇒ Object
- #on_message_begin ⇒ Object
- #on_message_complete ⇒ Object
- #on_read(data) ⇒ Object
- #on_write_complete ⇒ Object
- #parse_query(query) ⇒ Object
- #send_response(code, header, body) ⇒ Object
- #send_response_and_close(code, header, body) ⇒ Object
- #send_response_nobody(code, header) ⇒ Object
- #step_idle ⇒ Object
Constructor Details
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) ⇒ Handler
Returns a new instance of Handler.
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/fluent/plugin/in_http.rb', line 346 def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) @io = io @km = km @callback = callback @body_size_limit = body_size_limit @next_close = false @format_name = format_name @log = log @cors_allow_origins = cors_allow_origins @cors_allow_credentials = cors_allow_credentials @idle = 0 @add_query_params = add_query_params @km.add(self) @remote_port, @remote_addr = io.remote_port, io.remote_addr @parser = Http::Parser.new(self) end |
Instance Attribute Details
#content_type ⇒ Object (readonly)
Returns the value of attribute content_type.
344 345 346 |
# File 'lib/fluent/plugin/in_http.rb', line 344 def content_type @content_type end |
Instance Method Details
#close ⇒ Object
590 591 592 |
# File 'lib/fluent/plugin/in_http.rb', line 590 def close @io.close end |
#closing? ⇒ Boolean
603 604 605 |
# File 'lib/fluent/plugin/in_http.rb', line 603 def closing? @next_close end |
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose. Respond with ‘200 OK` to accommodate it.
455 456 457 |
# File 'lib/fluent/plugin/in_http.rb', line 455 def handle_get_request return send_response_and_close(RES_200_STATUS, {}, "") end |
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 |
# File 'lib/fluent/plugin/in_http.rb', line 461 def # Is CORS enabled in the first place? if @cors_allow_origins.nil? return send_response_and_close(RES_403_STATUS, {}, "") end # in_http does not support HTTP methods except POST if @access_control_request_method != 'POST' return send_response_and_close(RES_403_STATUS, {}, "") end header = { "Access-Control-Allow-Methods" => "POST", "Access-Control-Allow-Headers" => @access_control_request_headers || "", } # Check the origin and send back a CORS response if @cors_allow_origins.include?('*') header["Access-Control-Allow-Origin"] = "*" send_response_and_close(RES_200_STATUS, header, "") elsif include_cors_allow_origin header["Access-Control-Allow-Origin"] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end send_response_and_close(RES_200_STATUS, header, "") else send_response_and_close(RES_403_STATUS, {}, "") end end |
#include_cors_allow_origin ⇒ Object
630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 |
# File 'lib/fluent/plugin/in_http.rb', line 630 def include_cors_allow_origin if @origin.nil? return false end if @cors_allow_origins.include?(@origin) return true end filtered_cors_allow_origins = @cors_allow_origins.select {|origin| origin != ""} r = filtered_cors_allow_origins.find do |origin| (start_str, end_str) = origin.split("*", 2) @origin.start_with?(start_str) && @origin.end_with?(end_str) end !r.nil? end |
#on_body(chunk) ⇒ Object
440 441 442 443 444 445 446 447 448 |
# File 'lib/fluent/plugin/in_http.rb', line 440 def on_body(chunk) if @body.bytesize + chunk.bytesize > @body_size_limit unless closing? send_response_and_close("413 Request Entity Too Large", {}, "Too large") end return end @body << chunk end |
#on_close ⇒ Object
369 370 371 |
# File 'lib/fluent/plugin/in_http.rb', line 369 def on_close @km.delete(self) end |
#on_headers_complete(headers) ⇒ Object
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
# File 'lib/fluent/plugin/in_http.rb', line 386 def on_headers_complete(headers) expect = nil size = nil if @parser.http_version == [1, 1] @keep_alive = true else @keep_alive = false end @env = {} @content_type = "" @content_encoding = "" headers.each_pair {|k,v| @env["HTTP_#{k.tr('-','_').upcase}"] = v case k when /\AExpect\z/i expect = v when /\AContent-Length\Z/i size = v.to_i when /\AContent-Type\Z/i @content_type = v when /\AContent-Encoding\Z/i @content_encoding = v when /\AConnection\Z/i if /close/i.match?(v) @keep_alive = false elsif /Keep-alive/i.match?(v) @keep_alive = true end when /\AOrigin\Z/i @origin = v when /\AX-Forwarded-For\Z/i # For multiple X-Forwarded-For headers. Use first header value. v = v.first if v.is_a?(Array) @remote_addr = v.split(",").first when /\AAccess-Control-Request-Method\Z/i @access_control_request_method = v when /\AAccess-Control-Request-Headers\Z/i @access_control_request_headers = v end } if expect if expect == '100-continue'.freeze if !size || size < @body_size_limit send_response_nobody("100 Continue", {}) else send_response_and_close("413 Request Entity Too Large", {}, "Too large") end else send_response_and_close("417 Expectation Failed", {}, "") end end end |
#on_message_begin ⇒ Object
382 383 384 |
# File 'lib/fluent/plugin/in_http.rb', line 382 def @body = '' end |
#on_message_complete ⇒ Object
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 |
# File 'lib/fluent/plugin/in_http.rb', line 492 def return if closing? if @parser.http_method == 'GET'.freeze return handle_get_request() end if @parser.http_method == 'OPTIONS'.freeze return () end # CORS check # ========== # For every incoming request, we check if we have some CORS # restrictions and allow listed origins through @cors_allow_origins. # If origin is empty, it's likely a server-to-server request and considered safe. unless @cors_allow_origins.nil? unless @cors_allow_origins.include?('*') || include_cors_allow_origin || @origin.nil? send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "") return end end # Content Encoding # ================= # Decode payload according to the "Content-Encoding" header. # For now, we only support 'gzip' and 'deflate'. begin if @content_encoding == 'gzip'.freeze @body = Zlib::GzipReader.new(StringIO.new(@body)).read elsif @content_encoding == 'deflate'.freeze @body = Zlib::Inflate.inflate(@body) end rescue @log.warn 'fails to decode payload', error: $!.to_s send_response_and_close(RES_400_STATUS, {}, "") return end @env['REMOTE_ADDR'] = @remote_addr if @remote_addr uri = URI.parse(@parser.request_url) params = parse_query(uri.query) if @format_name != 'default' params[EVENT_RECORD_PARAMETER] = @body elsif /^application\/x-www-form-urlencoded/.match?(@content_type) params.update parse_query(@body) elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/ boundary = WEBrick::HTTPUtils.dequote($1) params.update WEBrick::HTTPUtils.parse_form_data(@body, boundary) elsif /^application\/json/.match?(@content_type) params['json'] = @body elsif /^application\/csp-report/.match?(@content_type) params['json'] = @body elsif /^application\/msgpack/.match?(@content_type) params['msgpack'] = @body elsif /^application\/x-ndjson/.match?(@content_type) params['ndjson'] = @body end path_info = uri.path if (@add_query_params) query_params = parse_query(uri.query) query_params.each_pair {|k,v| params["QUERY_#{k.tr('-','_').upcase}"] = v } end params.merge!(@env) @env.clear code, header, body = @callback.call(path_info, params) body = body.to_s header = header.dup if header.frozen? unless @cors_allow_origins.nil? if @cors_allow_origins.include?('*') header['Access-Control-Allow-Origin'] = '*' elsif include_cors_allow_origin header['Access-Control-Allow-Origin'] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end end end if @keep_alive header['Connection'] = 'Keep-Alive'.freeze send_response(code, header, body) else send_response_and_close(code, header, body) end end |
#on_read(data) ⇒ Object
373 374 375 376 377 378 379 380 |
# File 'lib/fluent/plugin/in_http.rb', line 373 def on_read(data) @idle = 0 @parser << data rescue @log.warn "unexpected error", error: $!.to_s @log.warn_backtrace @io.close end |
#on_write_complete ⇒ Object
594 595 596 |
# File 'lib/fluent/plugin/in_http.rb', line 594 def on_write_complete @io.close if @next_close end |
#parse_query(query) ⇒ Object
647 648 649 |
# File 'lib/fluent/plugin/in_http.rb', line 647 def parse_query(query) query.nil? ? {} : Hash[URI.decode_www_form(query, Encoding::ASCII_8BIT)] end |
#send_response(code, header, body) ⇒ Object
607 608 609 610 611 612 613 614 615 616 617 618 619 |
# File 'lib/fluent/plugin/in_http.rb', line 607 def send_response(code, header, body) header['Content-Length'] ||= body.bytesize header['Content-Type'] ||= 'text/plain'.freeze data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) @io.write(body) end |
#send_response_and_close(code, header, body) ⇒ Object
598 599 600 601 |
# File 'lib/fluent/plugin/in_http.rb', line 598 def send_response_and_close(code, header, body) send_response(code, header, body) @next_close = true end |
#send_response_nobody(code, header) ⇒ Object
621 622 623 624 625 626 627 628 |
# File 'lib/fluent/plugin/in_http.rb', line 621 def send_response_nobody(code, header) data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) end |
#step_idle ⇒ Object
365 366 367 |
# File 'lib/fluent/plugin/in_http.rb', line 365 def step_idle @idle += 1 end |