Class: LogStash::Inputs::HttpClient
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::HttpClient
- Defined in:
- lib/logstash/inputs/httpclient.rb
Instance Method Summary collapse
- #register ⇒ Object
-
#run(queue) ⇒ Object
def register.
Instance Method Details
#register ⇒ Object
78 79 80 |
# File 'lib/logstash/inputs/httpclient.rb', line 78 def register @uri = URI(@url) end |
#run(queue) ⇒ Object
def register
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/logstash/inputs/httpclient.rb', line 82 def run(queue) begin num_events = 0 queue_time = nil #track an average of how long it's taking to queue events into logstash codec_parse_time = nil http_request_time = nil batch_id = nil @logger.debug("Initializing http connection") #Net::HTTP.start creates a connection to the HTTP server and keeps it alive for the duration. Throws an EOFError when connection breaks. Net::HTTP.start @uri.host, @uri.port, :use_ssl => @uri.scheme == 'https' do |http| while true http_start = Time.now sleepIval = @interval @logger.debug("Executing http get request: " + @uri.host + @uri.request_uri) request = Net::HTTP::Get.new(@uri.request_uri) #include stats from processing the previous batch as custom headers if num_events > 0 #0 if there wasn't anything processed in the last batch #send the times/stats (from the previous batch) as custom headers to the server request ["X-Logstash-Num-Events"] = num_events request ["X-Logstash-Http-Request-Total-Secs"] = http_request_time request ["X-Logstash-Codec-Parse-Total-Secs"] = codec_parse_time request ["X-Logstash-Queue-Total-Secs"] = queue_time end if !batch_id.nil? request ["X-Successful-Batch-Ids"] = batch_id #send the previous successfully processed batch id so the server knows those messages were all processed by logstash end response = http.request request # execute GET request and return Net::HTTPResponse object if response["X-More-Events-Available"] == "true" sleepIval = 0 #don't wait if the server indicated there are more events ready now end batch_id = response["X-Messages-Batch-Id"] #save the batch id of the event in this response body, so it can be ack'd in the next http request http_request_time = Time.now - http_start #check if events were returned and process them using the defined @codec num_events = 0 if response.code.to_i == #HTTP Response code indicated there are no messages. Skip processing response and sleep. batch_id = nil elsif response.code.to_i != 200 @logger.warn("Received non-200 response code: #{response.code}. Will not process this response.") elsif response.body.nil? @logger.warn("Received normal #{response.code} response code, but the response body is empty. Will not process this response.") else #normal HTTP 200 OK response body -- process this response codec_start = Time.now events = Array.new #will store the decoded & decorated event(s) from the message body in this temporary in-memory array @codec.decode(response.body) do |event| event[@response_object_name] = {} if @include_response_headers event[@response_object_name]["headers"] = response end if @include_response_code event[@response_object_name]["code"] = response.code.to_i end if @include_http_request_time event[@response_object_name]["took_secs"] = http_request_time end decorate(event) events.push(event) end #end decoding events in response body num_events = events.length codec_parse_time = Time.now - codec_start #now queue the events to Logstash's bounded input queue and time how long it takes queue_start = Time.now events.each{ |event| #@logger.info("Queueing event") queue << event #blocks if queue is full #@logger.info("Event has been queued") } queue_time = Time.now - queue_start @logger.info("HTTP GET request processed #{events.length} events. http_request_secs=#{http_request_time}, codec_parse_secs=#{codec_parse_time}, queueing_secs=#{queue_time}, queue_size=#{queue.length}/#{queue.max}, queue_num_threads_waiting=#{queue.num_waiting}. Server=" + response["Server"] + ". Request=" + @uri.host + @uri.request_uri) end #end normal processing of HTTP response body #TODO: should we provide an option to post back (ack) immetiately once we sucecssfully queue all the messages in the response if(sleepIval > 0) @logger.debug("Waiting #{sleepIval} seconds before next http request") sleep(sleepIval) end end #while loop end #HTTP keepalive rescue => x if x.instance_of?(EOFError) #this seems to happen if the http keepalive times out and server disconnects @logger.warn("HTTP Connection has closed (EOFError), will reset http client connection and try again.", :exception => x) sleep(1) else @logger.warn("Error occurred, resetting http client connection and trying again in 10 seconds.", :exception => x) @logger.warn(x.backtrace) sleep(10) #todo: configurable sleep time for errors? end retry end #end begin outside of the loop @logger.warn("Unexpected: HTTP client has ended") end |