Class: OMGF::VerifyPaths
- Inherits:
-
Object
- Object
- OMGF::VerifyPaths
- Defined in:
- lib/omgf/verify_paths.rb
Overview
This module makes HEAD requests with reusable HTTP connections to verify paths. This is faster than having the mogilefsd tracker verifying paths, and the client could have broken routing to some the storage nodes the mogilefsd tracker/monitor can see.
Defined Under Namespace
Classes: HeadSock
Instance Method Summary collapse
- #error(msg) ⇒ Object
-
#finish(pollset) ⇒ Object
recover any unfinished URLs in pollset asynchronously in the background.
-
#finisher(timeout = 10000) ⇒ Object
this runs in a background thread to cleanup all the requests that didn’t finish quickly enough.
-
#initialize(logger) ⇒ VerifyPaths
constructor
A new instance of VerifyPaths.
- #iter_check(ok, sock, pollset) ⇒ Object
-
#key_for(uri) ⇒ Object
returns a string key for the connection pool.
-
#sock_get(uri) ⇒ Object
initializes a cached connection for
urior creates a new one. -
#sock_put(sock) ⇒ Object
returns an idle socket to the pool.
-
#verify(uris, count, timeout) ⇒ Object
reorders URIs based on response time This is the main method of this class.
Constructor Details
#initialize(logger) ⇒ VerifyPaths
Returns a new instance of VerifyPaths.
96 97 98 99 100 101 102 103 |
# File 'lib/omgf/verify_paths.rb', line 96 def initialize(logger) @pool = Hash.new { |hash,host_port| hash[host_port] = [] } @logger = logger @finishq = Queue.new @finisher = nil @lock = Mutex.new @pid = $$ end |
Instance Method Details
#error(msg) ⇒ Object
105 106 107 |
# File 'lib/omgf/verify_paths.rb', line 105 def error(msg) @logger.error(msg) if @logger end |
#finish(pollset) ⇒ Object
recover any unfinished URLs in pollset asynchronously in the background
188 189 190 191 192 193 194 195 |
# File 'lib/omgf/verify_paths.rb', line 188 def finish(pollset) # :nodoc: @finishq.push(pollset) @lock.synchronize do unless @finisher && @finisher.alive? @finisher = Thread.new { finisher } end end end |
#finisher(timeout = 10000) ⇒ Object
this runs in a background thread to cleanup all the requests that didn’t finish quickly enough
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/omgf/verify_paths.rb', line 138 def finisher(timeout = 10000) begin pollset = @finishq.pop # park here when idle while ready = Kgio.poll(pollset.dup, timeout) ready.each_key do |sock| sock.retry_ok = false # try to return good sockets back to the pool iter_check([], sock, pollset) end # try to stuff the pollset as much as possible for further looping while more = (@finishq.pop(true) rescue nil) pollset.merge!(more) end end # connections timed out, kill them pollset.each_key { |sock| sock.close } rescue => err error("#{err.message} (#{err.class})") end while true end |
#iter_check(ok, sock, pollset) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/omgf/verify_paths.rb', line 109 def iter_check(ok, sock, pollset) rv = sock.poll_iter(pollset) case rv when Symbol # in progress when Array code = rv[0].to_i if 200 == code ok << sock.uri elsif code >= 100 && code <= 999 error("HEAD #{sock.uri} returned HTTP code: #{code}") else error("HEAD #{sock.uri} returned #{rv.inspect} (kcar bug?)") end sock_put(sock) when nil # premature EOF error("HEAD #{sock.uri} hit socket EOF") else # exception or some other error return value... if rv.respond_to?(:message) error("HEAD #{sock.uri} error: #{rv.message} (#{rv.class})") else error("HEAD #{sock.uri} error (#{rv.class}): #{rv.inspect}") end end rv end |
#key_for(uri) ⇒ Object
returns a string key for the connection pool
198 199 200 |
# File 'lib/omgf/verify_paths.rb', line 198 def key_for(uri) "#{uri.host}:#{uri.port}" end |
#sock_get(uri) ⇒ Object
initializes a cached connection for uri or creates a new one
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/omgf/verify_paths.rb', line 203 def sock_get(uri) key = key_for(uri) # detect forks and prevent sharing of connected sockets across processes @lock.synchronize do if @pid != $$ @pid = $$ @pool.clear end end while sock = @lock.synchronize { @pool[key].pop } begin # check if sock is still alive and idle # :wait_readable is good here break if sock.kgio_tryread(1) == :wait_readable rescue # ignore socket errors, we'll just give them a new socket # socket should've been idle, but it was not (or EOFed on us) # give them a new one end sock.close end sock ||= HeadSock.start(uri) sock.http_init(uri) sock rescue # we'll return nil on any errors end |
#sock_put(sock) ⇒ Object
returns an idle socket to the pool
235 236 237 238 239 240 |
# File 'lib/omgf/verify_paths.rb', line 235 def sock_put(sock) key = key_for(sock.uri) sock.http_reusable? and @lock.synchronize { @pool[key] << sock } rescue => err error("HTTP reuse check failed: #{err.message} (#{err.class})") end |
#verify(uris, count, timeout) ⇒ Object
reorders URIs based on response time This is the main method of this class
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/omgf/verify_paths.rb', line 164 def verify(uris, count, timeout) tout = (timeout * 1000).to_i pollset = {} ok = [] uris.each do |uri| sock = sock_get(uri) and iter_check(ok, sock, pollset) end while ok.size < count && tout > 0 && ! pollset.empty? t0 = Time.now ready = Kgio.poll(pollset.dup, tout) or break tout -= ((Time.now - t0) * 1000).to_i ready.each_key do |sock| iter_check(ok, sock, pollset) end end finish(pollset) unless pollset.empty? [ok, uris - ok] # good URLs first end |