Class: MogileFS::MogileFS

Inherits:
Client
  • Object
show all
Includes:
Bigfile
Defined in:
lib/mogilefs/mogilefs.rb

Overview

MogileFS file manipulation client.

Create a new instance that will communicate with these trackers:
hosts = %w[192.168.1.69:6001 192.168.1.70:6001]
mg = MogileFS::MogileFS.new(:domain => 'test', :hosts => hosts)

# Stores "A bunch of text to store" into 'some_key' with a class of 'text'.
mg.store_content('some_key', 'text', "A bunch of text to store")

# Retrieve data from 'some_key' as a string
data = mg.get_file_data('some_key')

# Store the contents of 'image.jpeg' into the key 'my_image' with a
# class of 'image'.
mg.store_file('my_image', 'image', 'image.jpeg')

# Store the contents of 'image.jpeg' into the key 'my_image' with a
# class of 'image' using an open IO object.
File.open('image.jpeg') { |fp| mg.store_file('my_image', 'image', fp) }

# Retrieve the contents of 'my_image' into '/path/to/huge_file'
# without slurping the entire contents into memory:
mg.get_file_data('my_image', '/path/to/huge_file')

# Remove the key 'my_image' and 'some_key'.
mg.delete('my_image')
mg.delete('some_key')

Instance Attribute Summary collapse

Attributes inherited from Client

#backend, #hosts

Instance Method Summary collapse

Methods inherited from Client

#err, #errstr, #readonly?, #reload

Constructor Details

#initialize(args = {}) ⇒ MogileFS

Creates a new MogileFS::MogileFS instance. args must include a key :domain specifying the domain of this client.

Optional parameters for args:

:get_file_data_timeout => Numeric

See get_file_data_timeout

:new_file_max_time => Numeric

See new_file_max_time

:fail_timeout => Numeric

Delay before retrying a failed tracker backends. Defaults to 5 seconds.

:timeout => Numeric

Timeout for tracker backend responses. Defaults to 3 seconds.

:connect_timeout => Integer

Timeout for connecting to a tracker Defaults to 3 seconds

Raises:

  • (ArgumentError)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/mogilefs/mogilefs.rb', line 71

def initialize(args = {})
  @domain = args[:domain]

  @get_file_data_timeout = args[:get_file_data_timeout] || 5
  @new_file_max_time = args[:new_file_max_time] || 3600.0
  @nhp_get = nhp_new('get')
  @nhp_get.open_timeout = @nhp_get.read_timeout = @get_file_data_timeout
  @nhp_put = nhp_new('put')
  @nhp_put.open_timeout = @nhp_put.read_timeout = @new_file_max_time

  raise ArgumentError, "you must specify a domain" unless @domain

  if @backend = args[:db_backend]
    @readonly = true
  else
    super
  end
end

Instance Attribute Details

#domainObject

The domain of keys for this MogileFS client.



35
36
37
# File 'lib/mogilefs/mogilefs.rb', line 35

def domain
  @domain
end

#get_file_data_timeoutObject

The timeout for get_file_data (per-read() system call). Defaults to five seconds.



39
40
41
# File 'lib/mogilefs/mogilefs.rb', line 39

def get_file_data_timeout
  @get_file_data_timeout
end

#new_file_max_timeObject

The maximum allowed time for creating a new_file. Defaults to 1 hour.



42
43
44
# File 'lib/mogilefs/mogilefs.rb', line 42

def new_file_max_time
  @new_file_max_time
end

Instance Method Details

#delete(key) ⇒ Object

Removes key.



373
374
375
376
377
378
# File 'lib/mogilefs/mogilefs.rb', line 373

def delete(key)
  raise MogileFS::ReadOnlyError if readonly?

  @backend.delete :domain => @domain, :key => key
  true
end

#each_file_info(prefix = "", args = nil) ⇒ Object

Enumerates keys and yields a file_info hash for each key matched by prefix



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/mogilefs/mogilefs.rb', line 109

def each_file_info(prefix = "", args = nil)
  # FIXME: there's a lot of duplicate code from list_keys_verbose here...
  raise ArgumentError, "need block" unless block_given?
  ordered = ready = nil
  on_file_info = lambda do |info|
    Hash === info or raise info
    file_info_cleanup(info)

    # deal with trackers with multiple queryworkers responding out-of-order
    ready[info["key"]] = info
    while info = ready.delete(ordered[-1])
      ordered.pop
      yield info
    end
  end

  nr = 0
  opts = { :domain => @domain }
  opts[:devices] = 1 if args && args[:devices]
  after = args ? args[:after] : nil
  limit = args ? args[:limit] : nil

  begin
    keys, after = list_keys(prefix, after, limit || 1000)
    return nr unless keys && keys[0]
    ordered = keys.reverse
    ready = {}
    nr += keys.size
    limit -= keys.size if limit

    keys.each do |key|
      opts[:key] = key
      @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
    end
    @backend.pipeline_wait
  rescue MogileFS::PipelineError, SystemCallError,
         MogileFS::RequestTruncatedError,
         MogileFS::UnreadableSocketError,
         MogileFS::InvalidResponseError, # truncated response
         MogileFS::Timeout
    @backend.shutdown
    keys = (ordered - ready.keys).reverse!
    retry
  end while limit == nil || limit > 0
rescue
  @backend.shutdown
  raise
end

#each_key(prefix = "", args = nil, &block) ⇒ Object

Enumerates keys, limited by optional prefix args may also be specified for an optional :limit and :after (default: nil)



93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/mogilefs/mogilefs.rb', line 93

def each_key(prefix = "", args = nil, &block)
  after = limit = nil
  if args
    after = args[:after]
    limit = args[:limit]
  end
  begin
    keys, after = list_keys(prefix, after, limit || 1000, &block)
    return unless keys && keys[0]
    limit -= keys.size if limit
  end while limit == nil || limit > 0
  nil
end

#exist?(key) ⇒ Boolean

Returns true if key exists, false if not

Returns:

  • (Boolean)


231
232
233
234
235
236
237
238
239
240
241
# File 'lib/mogilefs/mogilefs.rb', line 231

def exist?(key)
  args = { :key => key, :domain => @domain , :ruby_no_raise => true}
  case rv = @backend.get_paths(args)
  when Hash
    true
  when MogileFS::Backend::UnknownKeyError
    false
  else
    raise rv
  end
end

#file_debug(args) ⇒ Object

Given an Integer fid or String key and domain, thorougly search the database for all occurences of a particular fid.

Use this sparingly, this command hits the master database numerous times and is very expensive. This is not for production use, only troubleshooting and debugging.

Searches for fid=666:

client.file_debug(666)

Search for key=foo using the default domain for this object:

client.file_debug("foo")

Search for key=foo in domain=“bar”:

client.file_debug(:key => "foo", :domain => "bar")


533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
# File 'lib/mogilefs/mogilefs.rb', line 533

def file_debug(args)
  case args
  when Integer then args = { "fid" => args }
  when String then args = { "key" => args }
  end
  opts = { :domain => args[:domain] || @domain }.merge!(args)

  rv = @backend.file_debug(opts)
  rv.each do |k,v|
    case k
    when /_(?:classid|devcount|dmid|fid|length|
          nexttry|fromdevid|failcount|flags|devid|type)\z/x
      rv[k] = v.to_i
    when /devids\z/
      rv[k] = v.split(','.freeze).map! { |x| x.to_i }
    end
  end
end

#file_info(key, args = nil) ⇒ Object

Return metadata about a file as a hash. Returns the domain, class, length, devcount, etc. as keys. Optionally, device ids (not paths) can be returned as well if :devices is specified and true.

This should only be used for informational purposes, and not usually for dynamically serving files.

mg.file_info("bar")

Returns:

{
  "domain" => "foo",
  "key" => "bar",
  "class" => "default",
  "devcount" => 2,
  "length => 666
}


501
502
503
504
505
# File 'lib/mogilefs/mogilefs.rb', line 501

def file_info(key, args = nil)
  opts = { :domain => @domain, :key => key }
  args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0
  file_info_cleanup(@backend.file_info(opts))
end

#file_info_cleanup(rv) ⇒ Object

:nodoc:



507
508
509
510
511
512
# File 'lib/mogilefs/mogilefs.rb', line 507

def file_info_cleanup(rv) # :nodoc:
  %w(fid length devcount).each { |f| rv[f] = rv[f].to_i }
  devids = rv["devids"] and
    rv["devids"] = devids.split(','.freeze).map! { |x| x.to_i }
  rv
end

#get_file_data(key, dst = nil, copy_length = nil, src_offset = nil) ⇒ Object

Retrieves the contents of key. If dst is specified, dst should be an IO-like object capable of receiving the write method or a path name. copy_length may be specified to limit the number of bytes to retrieve, and src_offset can be specified to specified the start position of the copy.



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/mogilefs/mogilefs.rb', line 163

def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil)
  paths = get_paths(key)
  if src_offset || copy_length
    src_offset ||= 0
    range_end = copy_length ? src_offset + copy_length - 1 : nil
    range = [ src_offset, range_end ]
  end

  if dst
    sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range)
    sock.stream_to(dst)
  elsif block_given?
    sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range)
    yield(sock)
  else
    errors = nil
    paths.each do |path|
      uri = URI.parse(path)
      get = Net::HTTP::Get.new(uri.path)
      get["range"] = "bytes=#{range[0]}-#{range[1]}" if range
      begin
        res = @nhp_get.request(uri, get)
        case res.code.to_i
        when 200, 206
          return res.body
        end
        (errors ||= []) << "#{path} - #{res.message} (#{res.class})"
      rescue => e
        (errors ||= []) << "#{path} - #{e.message} (#{e.class})"
      end
    end
    raise MogileFS::Error,
          "all paths failed with GET: #{errors.join(', ')}", []
  end
ensure
  sock.close if sock && ! sock.closed?
end

#get_paths(key, *args) ⇒ Object

Get the paths (URLs as strings) for key. If args is specified, it may contain:

  • :noverify -> boolean, whether or not the tracker checks (default: true)

  • :pathcount -> a positive integer of URLs to retrieve (default: 2)

  • :zone -> “alt” or nil (default: nil)

:noverify defaults to true because this client library is capable of verifying paths for readability itself. It is also faster and more reliable to verify paths on the client.



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/mogilefs/mogilefs.rb', line 210

def get_paths(key, *args)
  opts = {
    :domain => @domain,
    :key => key,
    :noverify => args[0],
    :zone => args[1],
  }
  if Hash === args[0]
    args = args[0]
    opts[:noverify] = args[:noverify]
    zone = args[:zone] and opts[:zone] = zone
    pathcount = args[:pathcount] and opts[:pathcount] = pathcount.to_i
  end

  opts[:noverify] = false == opts[:noverify] ? 0 : 1
  @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
  res = @backend.get_paths(opts)
  (1..res['paths'].to_i).map { |i| res["path#{i}"] }
end

#get_uris(key, *args) ⇒ Object

Get the URIs for key (paths) as URI::HTTP objects



244
245
246
# File 'lib/mogilefs/mogilefs.rb', line 244

def get_uris(key, *args)
  get_paths(key, *args).map! { |path| URI.parse(path) }
end

#list_keys(prefix = "", after = nil, limit = 1000, &block) ⇒ Object

Lists keys starting with prefix following after up to limit. If after is nil the list starts at the beginning.



418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
# File 'lib/mogilefs/mogilefs.rb', line 418

def list_keys(prefix = "", after = nil, limit = 1000, &block)
  @backend.respond_to?(:_list_keys) and
    return @backend._list_keys(domain, prefix, after, limit, &block)

  res = @backend.list_keys(:domain => domain, :prefix => prefix,
                           :after => after, :limit => limit,
                           :ruby_no_raise => true)
  MogileFS::Backend::NoneMatchError === res and return
  raise res if MogileFS::Error === res

  keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
  if block
    if 1 == block.arity
      keys.each { |key| block.call(key) }
    else
      list_keys_verbose(keys, block)
    end
  end

  [ keys, res['next_after'] ]
end

#list_keys_verbose(keys, block) ⇒ Object

:nodoc:



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# File 'lib/mogilefs/mogilefs.rb', line 440

def list_keys_verbose(keys, block) # :nodoc:
  # emulate the MogileFS::Mysql interface, slowly...
  ordered = keys.reverse
  ready = {}
  on_file_info = lambda do |info|
    Hash === info or raise info
    file_info_cleanup(info)

    # deal with trackers with multiple queryworkers responding out-of-order
    ready[info["key"]] = info
    while info = ready.delete(ordered[-1])
      block.call(ordered.pop, info["length"], info["devcount"])
    end
  end
  opts = { :domain => @domain }
  begin
    keys.each do |key|
      opts[:key] = key
      @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
    end
    @backend.pipeline_wait
  rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
    @backend.shutdown # reset the socket
    args = { :pathcount => 0x7fffffff }
    keys.each do |key|
      paths = get_paths(key, args)
      block.call(key, paths_size(paths), paths.size)
    end
  rescue MogileFS::PipelineError, SystemCallError,
         MogileFS::RequestTruncatedError,
         MogileFS::UnreadableSocketError,
         MogileFS::InvalidResponseError, # truncated response
         MogileFS::Timeout
    @backend.shutdown
    keys = (ordered - ready.keys).reverse!
    retry
  rescue
    @backend.shutdown
    raise
  end
end

#new_file(key, args = nil, bytes = nil) ⇒ Object

Creates a new file key in the domain of this object.

bytes is the expected size of the file if known in advance

It operates like File.open(…, “w”) and may take an optional block, yielding an IO-like object with support for the methods documented in MogileFS::NewFile::Writer.

This atomically replaces existing data stored as key when the block exits or when the returned object is closed.

args may contain the following options:

:content_length => Integer

This has the same effect as the (deprecated) bytes parameter.

:largefile => :stream, :content_range or :tempfile

See MogileFS::NewFile for more information on this

:class => String

The MogileFS storage class of the object.

:content_md5 => String, Proc, or :trailer

This can either be a Base64-encoded String, a Proc object, or the :trailer symbol. If given a String, it will be used as the Content-MD5 HTTP header. If given the :trailer symbol, this library will automatically generate an Content-MD5 HTTP trailer. If given a Proc object, this Proc object should give a Base64-encoded string which can be used as the Content-MD5 HTTP trailer when called at the end of the request.

Keep in mind most HTTP servers do not support HTTP trailers, so passing a String is usually the safest way to use this.

:info => Hash

This is an empty hash that will be filled the same information MogileFS::MogileFS#file_info.

Additionally, it contains one additional key: :uris, an array of URI::HTTP objects to the stored destinations



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/mogilefs/mogilefs.rb', line 293

def new_file(key, args = nil, bytes = nil) # :yields: file
  raise MogileFS::ReadOnlyError if readonly?
  opts = { :key => key, :multi_dest => 1 }
  case args
  when Hash
    opts[:domain] = args[:domain]
    open_args = args[:create_open_args]
    klass = args[:class] and "default" != klass and opts[:class] = klass
  when String
    opts[:class] = args if "default" != args
  end
  opts[:domain] ||= @domain
  res = @backend.create_open(open_args ? open_args.merge(opts) : opts)
  opts[:nhp_put] = @nhp_put

  dests = if dev_count = res['dev_count'] # multi_dest succeeded
    (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] }
  else # single destination returned
    # 0x0040:  d0e4 4f4b 2064 6576 6964 3d31 2666 6964  ..OK.devid=1&fid
    # 0x0050:  3d33 2670 6174 683d 6874 7470 3a2f 2f31  =3&path=http://1
    # 0x0060:  3932 2e31 3638 2e31 2e37 323a 3735 3030  92.168.1.72:7500
    # 0x0070:  2f64 6576 312f 302f 3030 302f 3030 302f  /dev1/0/000/000/
    # 0x0080:  3030 3030 3030 3030 3033 2e66 6964 0d0a  0000000003.fid..

    [[res['devid'], res['path']]]
  end

  opts.merge!(args) if Hash === args
  opts[:backend] = @backend
  opts[:fid] = res['fid']
  opts[:content_length] ||= bytes if bytes
  opts[:new_file_max_time] ||= @new_file_max_time
  opts[:start_time] = MogileFS.now
  info = opts[:info] and info["class"] = klass || "default"

  case (dests[0][1] rescue nil)
  when %r{\Ahttp://}
    http_file = MogileFS::NewFile.new(dests, opts)
    if block_given?
      yield http_file
      return http_file.commit # calls create_close
    else
      return http_file
    end
  when nil, ''
    raise MogileFS::EmptyPathError,
          "Empty path for mogile upload res=#{res.inspect}"
  else
    raise MogileFS::UnsupportedPathError,
          "paths '#{dests.inspect}' returned by backend is not supported"
  end
end

#nhp_new(name) ⇒ Object

:nodoc:



552
553
554
555
556
557
558
559
560
# File 'lib/mogilefs/mogilefs.rb', line 552

def nhp_new(name) # :nodoc:
  if Net::HTTP::Persistent::VERSION.to_f >= 3.0
    MogileFS::NHP.new(:name => name)
  else
    MogileFS::NHP.new(name)
  end
rescue NameError
  MogileFS::NHP.new(name)
end

#paths_size(paths) ⇒ Object

:nodoc:



411
412
413
414
# File 'lib/mogilefs/mogilefs.rb', line 411

def paths_size(paths) # :nodoc:
  require "mogilefs/paths_size"
  MogileFS::PathsSize.call(paths)
end

#rename(from, to) ⇒ Object

Renames a key from to key to.



394
395
396
397
398
399
# File 'lib/mogilefs/mogilefs.rb', line 394

def rename(from, to)
  raise MogileFS::ReadOnlyError if readonly?

  @backend.rename :domain => @domain, :from_key => from, :to_key => to
  nil
end

#size(key) ⇒ Object

Returns the size of key.



402
403
404
405
406
407
408
409
# File 'lib/mogilefs/mogilefs.rb', line 402

def size(key)
  @backend.respond_to?(:_size) and return @backend._size(domain, key)
  begin
    file_info(key)["length"].to_i
  rescue MogileFS::Backend::UnknownCommandError
    paths_size(get_paths(key))
  end
end

#sleep(duration) ⇒ Object

Sleeps duration, only used for testing



389
390
391
# File 'lib/mogilefs/mogilefs.rb', line 389

def sleep(duration) # :nodoc:
  @backend.sleep :duration => duration
end

#store_content(key, klass, content, opts = nil) ⇒ Object

Stores content into key in class klass, where content is a String This atomically replaces existing data stored as key



359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/mogilefs/mogilefs.rb', line 359

def store_content(key, klass, content, opts = nil)
  raise MogileFS::ReadOnlyError if readonly?
  (opts ||= {})[:class] = klass if String === klass

  new_file(key, opts) do |mfp|
    if content.is_a?(MogileFS::Util::StoreContent)
      mfp.streaming_io = content
    else
      mfp << content
    end
  end
end

#store_file(key, klass, file, opts = nil) ⇒ Object

Copies the contents of file into key in class klass. file can be either a path name (String or Pathname object) or an IO-like object that responds to #read or #readpartial. Returns size of file stored. This atomically replaces existing data stored as key



350
351
352
353
354
355
# File 'lib/mogilefs/mogilefs.rb', line 350

def store_file(key, klass, file, opts = nil)
  raise MogileFS::ReadOnlyError if readonly?
  (opts ||= {})[:class] = klass if String === klass

  new_file(key, opts) { |mfp| mfp.big_io = file }
end

#updateclass(key, newclass) ⇒ Object

Updates key to newclass



381
382
383
384
385
386
# File 'lib/mogilefs/mogilefs.rb', line 381

def updateclass(key, newclass)
  raise MogileFS::ReadOnlyError if readonly?

  @backend.updateclass(:domain => @domain, :key => key, :class => newclass)
  true
end