Class: LogStash::Inputs::HttpClient

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/httpclient.rb

Instance Method Summary collapse

Instance Method Details

#registerObject



78
79
80
# File 'lib/logstash/inputs/httpclient.rb', line 78

def register
  @uri = URI(@url)
end

#run(queue) ⇒ Object

def register



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/logstash/inputs/httpclient.rb', line 82

def run(queue)
  begin
    num_events = 0
    queue_time = nil #track an average of how long it's taking to queue events into logstash

    codec_parse_time = nil
    http_request_time = nil
    batch_id = nil
    
    @logger.debug("Initializing http connection")
    #Net::HTTP.start creates a connection to the HTTP server and keeps it alive for the duration. Throws an EOFError when connection breaks.

    Net::HTTP.start @uri.host, @uri.port, :use_ssl => @uri.scheme == 'https' do |http|
      while true
        http_start = Time.now
        sleepIval = @interval
        @logger.debug("Executing http get request: " + @uri.host + @uri.request_uri)
        request = Net::HTTP::Get.new(@uri.request_uri)
        
        #include stats from processing the previous batch as custom headers

        if num_events > 0 #0 if there wasn't anything processed in the last batch

          #send the times/stats (from the previous batch) as custom headers to the server 

          request ["X-Logstash-Num-Events"] = num_events
          request ["X-Logstash-Http-Request-Total-Secs"] = http_request_time
          request ["X-Logstash-Codec-Parse-Total-Secs"] = codec_parse_time
          request ["X-Logstash-Queue-Total-Secs"] = queue_time
        end
        if !batch_id.nil?
          request ["X-Successful-Batch-Ids"] = batch_id #send the previous successfully processed batch id so the server knows those messages were all processed by logstash

        end
        response = http.request request # execute GET request and return Net::HTTPResponse object

        if response["X-More-Events-Available"] == "true"
          sleepIval = 0 #don't wait if the server indicated there are more events ready now

        end
        batch_id = response["X-Messages-Batch-Id"] #save the batch id of the event in this response body, so it can be ack'd in the next http request

        http_request_time = Time.now - http_start
        
        
        #check if events were returned and process them using the defined @codec

        num_events = 0
        if response.code.to_i == @no_messages_response_code
          #HTTP Response code indicated there are no messages. Skip processing response and sleep.

          batch_id = nil
        elsif response.code.to_i != 200
          @logger.warn("Received non-200 response code: #{response.code}. Will not process this response.")
        elsif response.body.nil?
          @logger.warn("Received normal #{response.code} response code, but the response body is empty. Will not process this response.")
        else #normal HTTP 200 OK response body -- process this response

          codec_start = Time.now
          events = Array.new #will store the decoded & decorated event(s) from the message body in this temporary in-memory array

          @codec.decode(response.body) do |event|
            event[@response_object_name] = {}
            if @include_response_headers 
              event[@response_object_name]["headers"] = response
            end
            if @include_response_code 
              event[@response_object_name]["code"] = response.code.to_i
            end
            if @include_http_request_time 
              event[@response_object_name]["took_secs"] = http_request_time
            end
            decorate(event)
            events.push(event)
          end #end decoding events in response body

          num_events = events.length
          codec_parse_time = Time.now - codec_start
          
          #now queue the events to Logstash's bounded input queue and time how long it takes

          queue_start = Time.now
          events.each{ |event|
            #@logger.info("Queueing event")

            queue << event #blocks if queue is full

            #@logger.info("Event has been queued")

          }
          queue_time = Time.now - queue_start
          @logger.info("HTTP GET request processed #{events.length} events. http_request_secs=#{http_request_time}, codec_parse_secs=#{codec_parse_time}, queueing_secs=#{queue_time}, queue_size=#{queue.length}/#{queue.max}, queue_num_threads_waiting=#{queue.num_waiting}. Server=" + response["Server"] + ". Request=" + @uri.host + @uri.request_uri)
        end #end normal processing of HTTP response body

        
        #TODO: should we provide an option to post back (ack) immetiately once we sucecssfully queue all the messages in the response

        
        if(sleepIval > 0)
          @logger.debug("Waiting #{sleepIval} seconds before next http request")
          sleep(sleepIval)
        end
      end #while loop

    end #HTTP keepalive

  rescue => x
    if x.instance_of?(EOFError)
      #this seems to happen if the http keepalive times out and server disconnects

      @logger.warn("HTTP Connection has closed (EOFError), will reset http client connection and try again.", :exception => x)
      sleep(1)
    else
      @logger.warn("Error occurred, resetting http client connection and trying again in 10 seconds.", :exception => x)
      @logger.warn(x.backtrace)
      sleep(10) #todo: configurable sleep time for errors?

    end
    retry
  end #end begin outside of the loop

  @logger.warn("Unexpected: HTTP client has ended")
end