Module: HTTP_Spew::ClassMethods
- Included in:
- HTTP_Spew
- Defined in:
- lib/http_spew/class_methods.rb
Instance Method Summary collapse
-
#done_early(ready, failed, requests) ⇒ Object
:nodoc:.
-
#error_all(requests, error) ⇒ Object
:nodoc:.
- #wait(need, requests, timeout) ⇒ Object
-
#wait_mt(need, requests, timeout) ⇒ Object
Returns an array of requests that are complete, including those that have errored out.
-
#wait_nonblock!(need, requests) ⇒ Object
Returns an array of requests that are complete, including those that have errored out.
- #with_timeout(t) ⇒ Object
Instance Method Details
#done_early(ready, failed, requests) ⇒ Object
:nodoc:
8 9 10 11 12 13 14 15 16 17 |
# File 'lib/http_spew/class_methods.rb', line 8 def done_early(ready, failed, requests) # :nodoc: ready.concat(failed) pending = requests - ready unless pending.empty? error = HTTP_Spew::ConnectionReset.new("prematurely terminated") ready.concat(error_all(pending, error)) end ready.uniq! ready end |
#error_all(requests, error) ⇒ Object
:nodoc:
4 5 6 |
# File 'lib/http_spew/class_methods.rb', line 4 def error_all(requests, error) # :nodoc: requests.each { |req| req.error ||= error } end |
#wait(need, requests, timeout) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/http_spew/class_methods.rb', line 89 def wait(need, requests, timeout) ready, failed = [], [] pollset = {} t = [ timeout ] begin requests.each do |req| begin case rv = req.resume when Symbol # :wait_writable, :wait_readable pollset[req] = rv else (ready << req).size == need and return done_early(ready, failed, requests) pollset.delete(req) end rescue => e req.error = e failed << req pollset.delete(req) end end break if pollset.empty? busy = pollset.keys rv = with_timeout(t) { do_poll(pollset, t[0]) } or break end while t[0] > 0.0 && requests = rv.keys.concat(busy).uniq! ready.concat(failed) unless requests.empty? error = HTTP_Spew::TimeoutError.new("request timed out") ready.concat(error_all(requests, error)) ready.uniq! end ready end |
#wait_mt(need, requests, timeout) ⇒ Object
Returns an array of requests that are complete, including those that have errored out. If need is fullfilled, it closes all incomplete requests.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/http_spew/class_methods.rb', line 54 def wait_mt(need, requests, timeout) ready, failed = [], [] r, w = IO.pipe active = [] t = [ timeout ] requests.each_with_index do |req, i| active << Thread.new do begin rv = req.run(timeout) w.write([ i ].pack("v".freeze)) rv rescue => err err end end end begin with_timeout(t) { r.wait_readable(t[0]) } req_idx = r.read(2).unpack("v".freeze)[0] thr = active[req_idx] with_timeout(t) { thr.join(t[0]) } rv = thr.value (Array === rv ? ready : failed) << requests[req_idx] ready.size == need and return done_early(ready, failed, requests) end until t[0] < 0.0 || (ready.size + failed.size) == requests.size ready.concat(failed) pending = requests - ready error = HTTP_Spew::TimeoutError.new("request timed out") ready.concat(error_all(pending, error)) ensure w.close r.close end |
#wait_nonblock!(need, requests) ⇒ Object
Returns an array of requests that are complete, including those that have errored out. Incomplete requests remain in requests If need is fullfilled, it closes all incomplete requests and returns all requests.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/http_spew/class_methods.rb', line 23 def wait_nonblock!(need, requests) ready, failed = [], [] requests.delete_if do |req| begin case req.resume when Symbol # :wait_writable, :wait_readable false else (ready << req).size == need and return done_early(ready, failed, requests) true end rescue => e req.error = e failed << req end end ready.concat(failed).empty? ? nil : ready end |
#with_timeout(t) ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/http_spew/class_methods.rb', line 43 def with_timeout(t) t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) yield ensure t[0] -= Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 t[0] = 0.0 if t[0] < 0 end |