Class: HttpReactor::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/http_reactor/client.rb

Overview

An HTTP client that uses the Reactor pattern.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(requests = [], handler_proc = nil, options = {}, &block) ⇒ Client

Create a new HttpReactor client that will request the given URIs.

Parameters:

  • uris: An array of URI objects.

  • handler_proc: A Proc that will be called with the response and context

  • session_request_callback: A class that implements the session request callback interface found in the HttpCore library.

  • <tt>options: A hash of configuration options. See below.

The options hash may include the following options

  • :so_timeout: (default = 5 seconds)

  • :connection_timeout: The HTTP connection timeout (default = 10 seconds)

  • :socket_buffer_size: The buffer size (defaults to 8Kb)

  • :stale_connection_check: (defaults to false)

  • :tcp_nodelay: (defaults to true)

  • :user_agent: The user agent string to send (defaults to “JRubyHttpReactor”)

  • :event_listener: A class that implements the org.apache.http.nio.protocol interface



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/http_reactor/client.rb', line 157

def initialize(requests=[], handler_proc=nil, options={}, &block)
  handler_proc = block if block_given?
  handler_proc ||= default_handler_proc
  
  initialize_options(options)
  
  params = build_params(options)
  
  io_reactor = DefaultConnectingIOReactor.new(2, params);
  
  httpproc = BasicHttpProcessor.new;
  httpproc.add_interceptor(RequestContent.new);
  httpproc.add_interceptor(RequestTargetHost.new);
  httpproc.add_interceptor(RequestConnControl.new);
  httpproc.add_interceptor(RequestUserAgent.new);
  httpproc.add_interceptor(RequestExpectContinue.new);
  
  # We are going to use this object to synchronize between the 
  # I/O event and main threads
  request_counter = java.util.concurrent.CountDownLatch.new(requests.length);

  handler = BufferingHttpClientHandler.new(
    httpproc,
    RequestExecutionHandler.new(request_counter, handler_proc),
    org.apache.http.impl.DefaultConnectionReuseStrategy.new,
    params
  )
  
  handler.event_listener = options[:event_listener].new if options[:event_listener]
  
  io_event_dispatch = DefaultClientIOEventDispatch.new(handler, params)

  Thread.abort_on_exception = true
  t = Thread.new do
    begin
      #puts "Executing IO reactor"
      io_reactor.execute(io_event_dispatch)
    rescue java.io.InterruptedIOException => e
      puts "Interrupted"
    rescue java.io.IOException => e
      puts "I/O error in reactor execution thread: #{e.message}"
    end
    #puts "Shutdown"
  end
  
  process_requests(requests, io_reactor, request_counter)
  
  # Block until all connections signal
  # completion of the request execution
  request_counter.await()

  #puts "Shutting down I/O reactor"

  io_reactor.shutdown()

  #puts "Done"
end

Class Method Details

.process_requests(requests, io_reactor, request_counter) ⇒ Object



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/http_reactor/client.rb', line 219

def self.process_requests(requests, io_reactor, request_counter)
  requests.each do |request|
    uri = request.uri
    attachment = {
      :host => HttpHost.new(uri.host), 
      :path => uri.request_uri,
      :request => request,
      :io_reactor => io_reactor
    }
    io_reactor.connect(
      java.net.InetSocketAddress.new(uri.host, uri.port), 
      nil, 
      attachment,
      SessionRequestCallback.new(request_counter)
    )
  end
end

Instance Method Details

#process_requests(requests, io_reactor, request_counter) ⇒ Object



215
216
217
# File 'lib/http_reactor/client.rb', line 215

def process_requests(requests, io_reactor, request_counter)
  HttpReactor::Client.process_requests(requests, io_reactor, request_counter)
end