Module: HTTP_Spew::ClassMethods

Included in:
HTTP_Spew
Defined in:
lib/http_spew/class_methods.rb

Instance Method Summary collapse

Instance Method Details

#done_early(ready, failed, requests) ⇒ Object

:nodoc:



8
9
10
11
12
13
14
15
16
17
# File 'lib/http_spew/class_methods.rb', line 8

def done_early(ready, failed, requests) # :nodoc:
  ready.concat(failed)
  pending = requests - ready
  unless pending.empty?
    error = HTTP_Spew::ConnectionReset.new("prematurely terminated")
    ready.concat(error_all(pending, error))
  end
  ready.uniq!
  ready
end

#error_all(requests, error) ⇒ Object

:nodoc:



4
5
6
# File 'lib/http_spew/class_methods.rb', line 4

def error_all(requests, error) # :nodoc:
  requests.each { |req| req.error ||= error }
end

#wait(need, requests, timeout) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/http_spew/class_methods.rb', line 89

def wait(need, requests, timeout)
  ready, failed = [], []
  pollset = {}
  t = [ timeout ]
  begin
    requests.each do |req|
      begin
        case rv = req.resume
        when Symbol # :wait_writable, :wait_readable
          pollset[req] = rv
        else
          (ready << req).size == need and
            return done_early(ready, failed, requests)
          pollset.delete(req)
        end
      rescue => e
        req.error = e
        failed << req
        pollset.delete(req)
      end
    end
    break if pollset.empty?

    busy = pollset.keys
    rv = with_timeout(t) { do_poll(pollset, t[0]) } or break
  end while t[0] > 0.0 && requests = rv.keys.concat(busy).uniq!

  ready.concat(failed)
  unless requests.empty?
    error = HTTP_Spew::TimeoutError.new("request timed out")
    ready.concat(error_all(requests, error))
    ready.uniq!
  end
  ready
end

#wait_mt(need, requests, timeout) ⇒ Object

Returns an array of requests that are complete, including those that have errored out. If need is fullfilled, it closes all incomplete requests.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/http_spew/class_methods.rb', line 54

def wait_mt(need, requests, timeout)
  ready, failed = [], []
  r, w = IO.pipe
  active = []
  t = [ timeout ]
  requests.each_with_index do |req, i|
    active << Thread.new do
      begin
        rv = req.run(timeout)
        w.write([ i ].pack("v".freeze))
        rv
      rescue => err
        err
      end
    end
  end
  begin
    with_timeout(t) { r.wait_readable(t[0]) }
    req_idx = r.read(2).unpack("v".freeze)[0]
    thr = active[req_idx]
    with_timeout(t) { thr.join(t[0]) }
    rv = thr.value
    (Array === rv ? ready : failed) << requests[req_idx]
    ready.size == need and return done_early(ready, failed, requests)
  end until t[0] < 0.0 || (ready.size + failed.size) == requests.size

  ready.concat(failed)
  pending = requests - ready
  error = HTTP_Spew::TimeoutError.new("request timed out")
  ready.concat(error_all(pending, error))
ensure
  w.close
  r.close
end

#wait_nonblock!(need, requests) ⇒ Object

Returns an array of requests that are complete, including those that have errored out. Incomplete requests remain in requests If need is fullfilled, it closes all incomplete requests and returns all requests.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/http_spew/class_methods.rb', line 23

def wait_nonblock!(need, requests)
  ready, failed = [], []
  requests.delete_if do |req|
    begin
      case req.resume
      when Symbol # :wait_writable, :wait_readable
        false
      else
        (ready << req).size == need and
          return done_early(ready, failed, requests)
        true
      end
    rescue => e
      req.error = e
      failed << req
    end
  end
  ready.concat(failed).empty? ? nil : ready
end

#with_timeout(t) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/http_spew/class_methods.rb', line 43

def with_timeout(t)
  t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  yield
ensure
  t[0] -= Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
  t[0] = 0.0 if t[0] < 0
end