Class: OMGF::VerifyPaths

Inherits:
Object
  • Object
show all
Defined in:
lib/omgf/verify_paths.rb

Overview

This module makes HEAD requests with reusable HTTP connections to verify paths. This is faster than having the mogilefsd tracker verifying paths, and the client could have broken routing to some the storage nodes the mogilefsd tracker/monitor can see.

Defined Under Namespace

Classes: HeadSock

Instance Method Summary collapse

Constructor Details

#initialize(logger) ⇒ VerifyPaths

Returns a new instance of VerifyPaths.



96
97
98
99
100
101
102
103
# File 'lib/omgf/verify_paths.rb', line 96

def initialize(logger)
  @pool = Hash.new { |hash,host_port| hash[host_port] = [] }
  @logger = logger
  @finishq = Queue.new
  @finisher = nil
  @lock = Mutex.new
  @pid = $$
end

Instance Method Details

#error(msg) ⇒ Object



105
106
107
# File 'lib/omgf/verify_paths.rb', line 105

def error(msg)
  @logger.error(msg) if @logger
end

#finish(pollset) ⇒ Object

recover any unfinished URLs in pollset asynchronously in the background



188
189
190
191
192
193
194
195
# File 'lib/omgf/verify_paths.rb', line 188

def finish(pollset) # :nodoc:
  @finishq.push(pollset)
  @lock.synchronize do
    unless @finisher && @finisher.alive?
      @finisher = Thread.new { finisher }
    end
  end
end

#finisher(timeout = 10000) ⇒ Object

this runs in a background thread to cleanup all the requests that didn’t finish quickly enough



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/omgf/verify_paths.rb', line 138

def finisher(timeout = 10000)
  begin
    pollset = @finishq.pop # park here when idle

    while ready = Kgio.poll(pollset.dup, timeout)
      ready.each_key do |sock|
        sock.retry_ok = false
        # try to return good sockets back to the pool
        iter_check([], sock, pollset)
      end

      # try to stuff the pollset as much as possible for further looping
      while more = (@finishq.pop(true) rescue nil)
        pollset.merge!(more)
      end
    end

    # connections timed out, kill them
    pollset.each_key { |sock| sock.close }
  rescue => err
    error("#{err.message} (#{err.class})")
  end while true
end

#iter_check(ok, sock, pollset) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/omgf/verify_paths.rb', line 109

def iter_check(ok, sock, pollset)
  rv = sock.poll_iter(pollset)
  case rv
  when Symbol # in progress
  when Array
    code = rv[0].to_i
    if 200 == code
      ok << sock.uri
    elsif code >= 100 && code <= 999
      error("HEAD #{sock.uri} returned HTTP code: #{code}")
    else
      error("HEAD #{sock.uri} returned #{rv.inspect} (kcar bug?)")
    end
    sock_put(sock)
  when nil # premature EOF
    error("HEAD #{sock.uri} hit socket EOF")
  else
    # exception or some other error return value...
    if rv.respond_to?(:message)
      error("HEAD #{sock.uri} error: #{rv.message} (#{rv.class})")
    else
      error("HEAD #{sock.uri} error (#{rv.class}): #{rv.inspect}")
    end
  end
  rv
end

#key_for(uri) ⇒ Object

returns a string key for the connection pool



198
199
200
# File 'lib/omgf/verify_paths.rb', line 198

def key_for(uri)
  "#{uri.host}:#{uri.port}"
end

#sock_get(uri) ⇒ Object

initializes a cached connection for uri or creates a new one



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/omgf/verify_paths.rb', line 203

def sock_get(uri)
  key = key_for(uri)

  # detect forks and prevent sharing of connected sockets across processes
  @lock.synchronize do
    if @pid != $$
      @pid = $$
      @pool.clear
    end
  end

  while sock = @lock.synchronize { @pool[key].pop }
    begin
      # check if sock is still alive and idle
      # :wait_readable is good here
      break if sock.kgio_tryread(1) == :wait_readable
    rescue
      # ignore socket errors, we'll just give them a new socket
      # socket should've been idle, but it was not (or EOFed on us)
      # give them a new one
    end
    sock.close
  end

  sock ||= HeadSock.start(uri)
  sock.http_init(uri)
  sock
rescue
  # we'll return nil on any errors
end

#sock_put(sock) ⇒ Object

returns an idle socket to the pool



235
236
237
238
239
240
# File 'lib/omgf/verify_paths.rb', line 235

def sock_put(sock)
  key = key_for(sock.uri)
  sock.http_reusable? and @lock.synchronize { @pool[key] << sock }
rescue => err
  error("HTTP reuse check failed: #{err.message} (#{err.class})")
end

#verify(uris, count, timeout) ⇒ Object

reorders URIs based on response time This is the main method of this class



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/omgf/verify_paths.rb', line 164

def verify(uris, count, timeout)
  tout = (timeout * 1000).to_i
  pollset = {}
  ok = []

  uris.each do |uri|
    sock = sock_get(uri) and iter_check(ok, sock, pollset)
  end

  while ok.size < count && tout > 0 && ! pollset.empty?
    t0 = Time.now
    ready = Kgio.poll(pollset.dup, tout) or break
    tout -= ((Time.now - t0) * 1000).to_i

    ready.each_key do |sock|
      iter_check(ok, sock, pollset)
    end
  end

  finish(pollset) unless pollset.empty?
  [ok, uris - ok] # good URLs first
end