Class: MogileFS::MogileFS

Inherits:
Client
  • Object
show all
Includes:
Bigfile
Defined in:
lib/mogilefs/mogilefs.rb

Overview

MogileFS file manipulation client.

Create a new instance that will communicate with these trackers:
hosts = %w[192.168.1.69:6001 192.168.1.70:6001]
mg = MogileFS::MogileFS.new(:domain => 'test', :hosts => hosts)

# Stores "A bunch of text to store" into 'some_key' with a class of 'text'.
mg.store_content('some_key', 'text', "A bunch of text to store")

# Retrieve data from 'some_key' as a string
data = mg.get_file_data('some_key')

# Store the contents of 'image.jpeg' into the key 'my_image' with a
# class of 'image'.
mg.store_file('my_image', 'image', 'image.jpeg')

# Store the contents of 'image.jpeg' into the key 'my_image' with a
# class of 'image' using an open IO object.
File.open('image.jpeg') { |fp| mg.store_file('my_image', 'image', fp) }

# Retrieve the contents of 'my_image' into '/path/to/huge_file'
# without slurping the entire contents into memory:
mg.get_file_data('my_image', '/path/to/huge_file')

# Remove the key 'my_image' and 'some_key'.
mg.delete('my_image')
mg.delete('some_key')

Instance Attribute Summary collapse

Attributes inherited from Client

#backend, #hosts

Instance Method Summary collapse

Methods inherited from Client

#err, #errstr, #readonly?, #reload

Constructor Details

#initialize(args = {}) ⇒ MogileFS

Creates a new MogileFS::MogileFS instance. args must include a key :domain specifying the domain of this client.

Optional parameters for args:

:get_file_data_timeout => Numeric

See get_file_data_timeout

:new_file_max_time => Numeric

See new_file_max_time

:fail_timeout => Numeric

Delay before retrying a failed tracker backends. Defaults to 5 seconds.

:timeout => Numeric

Timeout for tracker backend responses. Defaults to 3 seconds.

Raises:

  • (ArgumentError)


67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/mogilefs/mogilefs.rb', line 67

def initialize(args = {})
  @domain = args[:domain]

  @get_file_data_timeout = args[:get_file_data_timeout] || 5
  @new_file_max_time = args[:new_file_max_time] || 3600.0

  raise ArgumentError, "you must specify a domain" unless @domain

  if @backend = args[:db_backend]
    @readonly = true
  else
    super
  end
end

Instance Attribute Details

#domainObject

The domain of keys for this MogileFS client.



35
36
37
# File 'lib/mogilefs/mogilefs.rb', line 35

def domain
  @domain
end

#get_file_data_timeoutObject

The timeout for get_file_data (per-read() system call). Defaults to five seconds.



39
40
41
# File 'lib/mogilefs/mogilefs.rb', line 39

def get_file_data_timeout
  @get_file_data_timeout
end

#new_file_max_timeObject

The maximum allowed time for creating a new_file. Defaults to 1 hour.



42
43
44
# File 'lib/mogilefs/mogilefs.rb', line 42

def new_file_max_time
  @new_file_max_time
end

Instance Method Details

#delete(key) ⇒ Object

Removes key.



324
325
326
327
328
329
# File 'lib/mogilefs/mogilefs.rb', line 324

def delete(key)
  raise MogileFS::ReadOnlyError if readonly?

  @backend.delete :domain => @domain, :key => key
  true
end

#each_file_info(prefix = "", args = nil) ⇒ Object

Enumerates keys and yields a file_info hash for each key matched by prefix



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/mogilefs/mogilefs.rb', line 93

def each_file_info(prefix = "", args = nil)
  # FIXME: there's a lot of duplicate code from list_keys_verbose here...
  raise ArgumentError, "need block" unless block_given?
  ordered = ready = nil
  on_file_info = lambda do |info|
    Hash === info or raise info
    file_info_cleanup(info)

    # deal with trackers with multiple queryworkers responding out-of-order
    ready[info["key"]] = info
    while info = ready.delete(ordered[-1])
      ordered.pop
      yield info
    end
  end

  nr = 0
  opts = { :domain => @domain }
  opts[:devices] = 1 if args && args[:devices]
  after = args ? args[:after] : nil
  limit = args ? args[:limit] : nil

  begin
    keys, after = list_keys(prefix, after, limit || 1000)
    return nr unless keys && keys[0]
    ordered = keys.reverse
    ready = {}
    nr += keys.size
    limit -= keys.size if limit

    keys.each do |key|
      opts[:key] = key
      @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
    end
    @backend.pipeline_wait
  rescue MogileFS::PipelineError, SystemCallError,
         MogileFS::RequestTruncatedError,
         MogileFS::UnreadableSocketError,
         MogileFS::InvalidResponseError, # truncated response
         MogileFS::Timeout
    @backend.shutdown
    keys = (ordered - ready.keys).reverse!
    retry
  end while limit == nil || limit > 0
rescue
  @backend.shutdown
  raise
end

#each_key(prefix = "", &block) ⇒ Object

Enumerates keys, limited by optional prefix



83
84
85
86
87
88
89
# File 'lib/mogilefs/mogilefs.rb', line 83

def each_key(prefix = "", &block)
  after = nil
  begin
    keys, after = list_keys(prefix, after, 1000, &block)
  end while keys && keys[0]
  nil
end

#exist?(key) ⇒ Boolean

Returns true if key exists, false if not

Returns:

  • (Boolean)


192
193
194
195
196
197
198
199
200
201
202
# File 'lib/mogilefs/mogilefs.rb', line 192

def exist?(key)
  args = { :key => key, :domain => @domain , :ruby_no_raise => true}
  case rv = @backend.get_paths(args)
  when Hash
    true
  when MogileFS::Backend::UnknownKeyError
    false
  else
    raise rv
  end
end

#file_debug(args) ⇒ Object

Given an Integer fid or String key and domain, thorougly search the database for all occurences of a particular fid.

Use this sparingly, this command hits the master database numerous times and is very expensive. This is not for production use, only troubleshooting and debugging.

Searches for fid=666:

client.file_debug(666)

Search for key=foo using the default domain for this object:

client.file_debug("foo")

Search for key=foo in domain=“bar”:

client.file_debug(:key => "foo", :domain => "bar")


484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
# File 'lib/mogilefs/mogilefs.rb', line 484

def file_debug(args)
  case args
  when Integer then args = { "fid" => args }
  when String then args = { "key" => args }
  end
  opts = { :domain => args[:domain] || @domain }.merge!(args)

  rv = @backend.file_debug(opts)
  rv.each do |k,v|
    case k
    when /_(?:classid|devcount|dmid|fid|length|
          nexttry|fromdevid|failcount|flags|devid|type)\z/x
      rv[k] = v.to_i
    when /devids\z/
      rv[k] = v.split(/,/).map! { |x| x.to_i }
    end
  end
end

#file_info(key, args = nil) ⇒ Object

Return metadata about a file as a hash. Returns the domain, class, length, devcount, etc. as keys. Optionally, device ids (not paths) can be returned as well if :devices is specified and true.

This should only be used for informational purposes, and not usually for dynamically serving files.

mg.file_info("bar")

Returns:

{
  "domain" => "foo",
  "key" => "bar",
  "class" => "default",
  "devcount" => 2,
  "length => 666
}


452
453
454
455
456
# File 'lib/mogilefs/mogilefs.rb', line 452

def file_info(key, args = nil)
  opts = { :domain => @domain, :key => key }
  args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0
  file_info_cleanup(@backend.file_info(opts))
end

#file_info_cleanup(rv) ⇒ Object

:nodoc:



458
459
460
461
462
463
# File 'lib/mogilefs/mogilefs.rb', line 458

def file_info_cleanup(rv) # :nodoc:
  %w(fid length devcount).each { |f| rv[f] = rv[f].to_i }
  devids = rv["devids"] and
    rv["devids"] = devids.split(/,/).map! { |x| x.to_i }
  rv
end

#get_file_data(key, dst = nil, copy_length = nil, src_offset = nil) ⇒ Object

Retrieves the contents of key. If dst is specified, dst should be an IO-like object capable of receiving the write method or a path name. copy_length may be specified to limit the number of bytes to retrieve, and src_offset can be specified to specified the start position of the copy.



147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/mogilefs/mogilefs.rb', line 147

def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil)
  paths = get_paths(key)
  sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout,
                                    copy_length, src_offset)
  if dst
    sock.stream_to(dst)
  elsif block_given?
    yield(sock)
  else
    sock.to_s
  end
  ensure
    sock.close if sock && ! sock.closed?
end

#get_paths(key, *args) ⇒ Object

Get the paths (URLs as strings) for key. If args is specified, it may contain:

  • :noverify -> boolean, whether or not the tracker checks (default: true)

  • :pathcount -> a positive integer of URLs to retrieve (default: 2)

  • :zone -> “alt” or nil (default: nil)

:noverify defaults to true because this client library is capable of verifying paths for readability itself. It is also faster and more reliable to verify paths on the client.



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/mogilefs/mogilefs.rb', line 171

def get_paths(key, *args)
  opts = {
    :domain => @domain,
    :key => key,
    :noverify => args[0],
    :zone => args[1],
  }
  if Hash === args[0]
    args = args[0]
    opts[:noverify] = args[:noverify]
    opts[:zone] = args[:zone]
    pathcount = args[:pathcount] and opts[:pathcount] = pathcount.to_i
  end

  opts[:noverify] = false == opts[:noverify] ? 0 : 1
  @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts)
  res = @backend.get_paths(opts)
  (1..res['paths'].to_i).map { |i| res["path#{i}"] }
end

#get_uris(key, *args) ⇒ Object

Get the URIs for key (paths) as URI::HTTP objects



205
206
207
# File 'lib/mogilefs/mogilefs.rb', line 205

def get_uris(key, *args)
  get_paths(key, *args).map! { |path| URI.parse(path) }
end

#list_keys(prefix = "", after = nil, limit = 1000, &block) ⇒ Object

Lists keys starting with prefix following after up to limit. If after is nil the list starts at the beginning.



369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/mogilefs/mogilefs.rb', line 369

def list_keys(prefix = "", after = nil, limit = 1000, &block)
  @backend.respond_to?(:_list_keys) and
    return @backend._list_keys(domain, prefix, after, limit, &block)

  res = @backend.list_keys(:domain => domain, :prefix => prefix,
                           :after => after, :limit => limit,
                           :ruby_no_raise => true)
  MogileFS::Backend::NoneMatchError === res and return
  raise res if MogileFS::Error === res

  keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] }
  if block
    if 1 == block.arity
      keys.each { |key| block.call(key) }
    else
      list_keys_verbose(keys, block)
    end
  end

  [ keys, res['next_after'] ]
end

#list_keys_verbose(keys, block) ⇒ Object

:nodoc:



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# File 'lib/mogilefs/mogilefs.rb', line 391

def list_keys_verbose(keys, block) # :nodoc:
  # emulate the MogileFS::Mysql interface, slowly...
  ordered = keys.reverse
  ready = {}
  on_file_info = lambda do |info|
    Hash === info or raise info
    file_info_cleanup(info)

    # deal with trackers with multiple queryworkers responding out-of-order
    ready[info["key"]] = info
    while info = ready.delete(ordered[-1])
      block.call(ordered.pop, info["length"], info["devcount"])
    end
  end
  opts = { :domain => @domain }
  begin
    keys.each do |key|
      opts[:key] = key
      @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
    end
    @backend.pipeline_wait
  rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
    @backend.shutdown # reset the socket
    args = { :pathcount => 0x7fffffff }
    keys.each do |key|
      paths = get_paths(key, args)
      block.call(key, paths_size(paths), paths.size)
    end
  rescue MogileFS::PipelineError, SystemCallError,
         MogileFS::RequestTruncatedError,
         MogileFS::UnreadableSocketError,
         MogileFS::InvalidResponseError, # truncated response
         MogileFS::Timeout
    @backend.shutdown
    keys = (ordered - ready.keys).reverse!
    retry
  rescue
    @backend.shutdown
    raise
  end
end

#new_file(key, args = nil, bytes = nil) ⇒ Object

Creates a new file key in the domain of this object.

bytes is the expected size of the file if known in advance

It operates like File.open(…, “w”) and may take an optional block, yielding an IO-like object with support for the methods documented in MogileFS::NewFile::Writer.

This atomically replaces existing data stored as key when the block exits or when the returned object is closed.

args may contain the following options:

:content_length => Integer

This has the same effect as the (deprecated) bytes parameter.

:largefile => :stream, :content_range or :tempfile

See MogileFS::NewFile for more information on this

:class => String

The MogileFS storage class of the object.

:content_md5 => String, Proc, or :trailer

This can either be a Base64-encoded String, a Proc object, or the :trailer symbol. If given a String, it will be used as the Content-MD5 HTTP header. If given the :trailer symbol, this library will automatically generate an Content-MD5 HTTP trailer. If given a Proc object, this Proc object should give a Base64-encoded string which can be used as the Content-MD5 HTTP trailer when called at the end of the request.

Keep in mind most HTTP servers do not support HTTP trailers, so passing a String is usually the safest way to use this.



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/mogilefs/mogilefs.rb', line 247

def new_file(key, args = nil, bytes = nil) # :yields: file
  raise MogileFS::ReadOnlyError if readonly?
  opts = { :key => key, :multi_dest => 1 }
  case args
  when Hash
    opts[:domain] = args[:domain]
    klass = args[:class] and "default" != klass and opts[:class] = klass
  when String
    opts[:class] = args if "default" != args
  end
  opts[:domain] ||= @domain
  res = @backend.create_open(opts)

  dests = if dev_count = res['dev_count'] # multi_dest succeeded
    (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] }
  else # single destination returned
    # 0x0040:  d0e4 4f4b 2064 6576 6964 3d31 2666 6964  ..OK.devid=1&fid
    # 0x0050:  3d33 2670 6174 683d 6874 7470 3a2f 2f31  =3&path=http://1
    # 0x0060:  3932 2e31 3638 2e31 2e37 323a 3735 3030  92.168.1.72:7500
    # 0x0070:  2f64 6576 312f 302f 3030 302f 3030 302f  /dev1/0/000/000/
    # 0x0080:  3030 3030 3030 3030 3033 2e66 6964 0d0a  0000000003.fid..

    [[res['devid'], res['path']]]
  end

  opts.merge!(args) if Hash === args
  opts[:backend] = @backend
  opts[:fid] = res['fid']
  opts[:content_length] ||= bytes if bytes
  opts[:new_file_max_time] ||= @new_file_max_time
  opts[:start_time] = Time.now

  case (dests[0][1] rescue nil)
  when %r{\Ahttp://}
    http_file = MogileFS::NewFile.new(dests, opts)
    if block_given?
      yield http_file
      return http_file.commit # calls create_close
    else
      return http_file
    end
  when nil, ''
    raise MogileFS::EmptyPathError,
          "Empty path for mogile upload res=#{res.inspect}"
  else
    raise MogileFS::UnsupportedPathError,
          "paths '#{dests.inspect}' returned by backend is not supported"
  end
end

#paths_size(paths) ⇒ Object

:nodoc:



362
363
364
365
# File 'lib/mogilefs/mogilefs.rb', line 362

def paths_size(paths) # :nodoc:
  require "mogilefs/paths_size"
  MogileFS::PathsSize.call(paths)
end

#rename(from, to) ⇒ Object

Renames a key from to key to.



345
346
347
348
349
350
# File 'lib/mogilefs/mogilefs.rb', line 345

def rename(from, to)
  raise MogileFS::ReadOnlyError if readonly?

  @backend.rename :domain => @domain, :from_key => from, :to_key => to
  nil
end

#size(key) ⇒ Object

Returns the size of key.



353
354
355
356
357
358
359
360
# File 'lib/mogilefs/mogilefs.rb', line 353

def size(key)
  @backend.respond_to?(:_size) and return @backend._size(domain, key)
  begin
    file_info(key)["length"].to_i
  rescue MogileFS::Backend::UnknownCommandError
    paths_size(get_paths(key))
  end
end

#sleep(duration) ⇒ Object

Sleeps duration, only used for testing



340
341
342
# File 'lib/mogilefs/mogilefs.rb', line 340

def sleep(duration) # :nodoc:
  @backend.sleep :duration => duration
end

#store_content(key, klass, content, opts = nil) ⇒ Object

Stores content into key in class klass, where content is a String This atomically replaces existing data stored as key



310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/mogilefs/mogilefs.rb', line 310

def store_content(key, klass, content, opts = nil)
  raise MogileFS::ReadOnlyError if readonly?
  (opts ||= {})[:class] = klass if String === klass

  new_file(key, opts) do |mfp|
    if content.is_a?(MogileFS::Util::StoreContent)
      mfp.streaming_io = content
    else
      mfp << content
    end
  end
end

#store_file(key, klass, file, opts = nil) ⇒ Object

Copies the contents of file into key in class klass. file can be either a path name (String or Pathname object) or an IO-like object that responds to #read or #readpartial. Returns size of file stored. This atomically replaces existing data stored as key



301
302
303
304
305
306
# File 'lib/mogilefs/mogilefs.rb', line 301

def store_file(key, klass, file, opts = nil)
  raise MogileFS::ReadOnlyError if readonly?
  (opts ||= {})[:class] = klass if String === klass

  new_file(key, opts) { |mfp| mfp.big_io = file }
end

#updateclass(key, newclass) ⇒ Object

Updates key to newclass



332
333
334
335
336
337
# File 'lib/mogilefs/mogilefs.rb', line 332

def updateclass(key, newclass)
  raise MogileFS::ReadOnlyError if readonly?

  @backend.updateclass(:domain => @domain, :key => key, :class => newclass)
  true
end