Module: MogileFS::Util

Included in:
Bigfile, HTTPFile, MogileFS, Network, Socket
Defined in:
lib/mogilefs/util.rb

Defined Under Namespace

Classes: StoreContent

Constant Summary collapse

CHUNK_SIZE =
65536

Instance Method Summary collapse

Instance Method Details

#sysrwloop(io_rd, io_wr, filter = nil) ⇒ Object

for copying large files while avoiding GC thrashing as much as possible writes the contents of io_rd into io_wr, running through filter if it is a Proc object. The filter proc must respond to a string argument (and return a string) and to nil (possibly returning a string or nil). This can be used to filter I/O through an Zlib::Inflate or Digest::MD5 object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/mogilefs/util.rb', line 14

def sysrwloop(io_rd, io_wr, filter = nil)
  copied = 0
  # avoid making sysread repeatedly allocate a new String
  # This is not well-documented, but both read/sysread can take
  # an optional second argument to use as the buffer to avoid
  # GC overhead of creating new strings in a loop
  buf = ' ' * CHUNK_SIZE # preallocate to avoid GC thrashing
  io_rd.flush rescue nil # flush may be needed for sockets/pipes, be safe
  io_wr.flush
  io_rd.sync = io_wr.sync = true
  loop do
    b = begin
      io_rd.sysread(CHUNK_SIZE, buf)
    rescue Errno::EAGAIN, Errno::EINTR
      IO.select([io_rd], nil, nil, nil)
      retry
    rescue EOFError
      break
    end
    b = filter.call(b) if filter
    copied += syswrite_full(io_wr, b)
  end

  # filter must take nil as a possible argument to indicate EOF
  if filter
    b = filter.call(nil)
    copied += syswrite_full(io_wr, b) if b && b.length > 0
  end
  copied
end

#syswrite_full(io_wr, buf, timeout = nil) ⇒ Object

writes the contents of buf to io_wr in full w/o blocking



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/mogilefs/util.rb', line 46

def syswrite_full(io_wr, buf, timeout = nil)
  written = 0
  loop do
    begin
      w = io_wr.syswrite(buf)
      written += w
      return written if w == buf.size
      buf = buf[w..-1]

      # a short syswrite means the next syswrite will likely block
      # inside the interpreter.  so force an IO.select on it so we can
      # timeout there if one was specified
      raise Errno::EAGAIN if timeout
    rescue Errno::EAGAIN, Errno::EINTR
      t0 = Time.now if timeout
      IO.select(nil, [io_wr], nil, timeout)
      if timeout && ((timeout -= (Time.now - t0)) < 0)
        raise MogileFS::Timeout, 'syswrite_full timeout'
      end
    end
  end
  # should never get here
end