Class: Hadoop::DFS::FileSystem

Inherits:
Object
  • Object
show all
Defined in:
ext/hdfs/hdfs.c

Instance Method Summary collapse

Constructor Details

#new(options = {}) ⇒ Object

Creates a new HDFS client connection, configured by options, returning a new Hadoop::DFS::FileSystem object if successful. If this fails, raises a ConnectError.

options can have the following keys:

  • local: whether to use the local filesystem instead of HDFS (default: false)

  • host: hostname or IP address of a Hadoop NameNode (default: ‘0.0.0.0’)

  • port: port through which to connect to Hadoop NameNode (default: 8020)

  • user: user to connect to filesystem as (default: current user)



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'ext/hdfs/hdfs.c', line 179

VALUE HDFS_File_System_initialize(int argc, VALUE* argv, VALUE self) {
  VALUE options;
  rb_scan_args(argc, argv, "01", &options);

  if (TYPE(options) != T_HASH) {
    rb_raise(e_dfs_exception, "options must be of type Hash");
  }

  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);

  if (NIL_P(options)) {
    options = rb_hash_new();
  }

  VALUE r_user = rb_hash_aref(options, rb_eval_string(":user"));
  char* hdfs_user = RTEST(r_user) ? RSTRING_PTR(r_user) : 
      (char*) HDFS_DEFAULT_USER;

  VALUE r_local = rb_hash_aref(options, rb_eval_string(":local"));
  if (r_local == Qtrue) {
    data->fs = hdfsConnectAsUser(NULL, 0, hdfs_user);
  } else {
    VALUE r_host = rb_hash_aref(options, rb_eval_string(":host"));
    VALUE r_port = rb_hash_aref(options, rb_eval_string(":port"));

    // Sets default values for host and port if not supplied by user.
    char* hdfs_host = RTEST(r_host) ? RSTRING_PTR(r_host) : 
        (char*) HDFS_DEFAULT_HOST;
    int hdfs_port   = RTEST(r_port) ? NUM2INT(r_port) :
        HDFS_DEFAULT_PORT;
    data->fs = hdfsConnectAsUser(hdfs_host, hdfs_port, hdfs_user);     
  }
 
  if (data->fs == NULL) {
    rb_raise(e_connect_error, "Failed to connect to HDFS");
    return Qnil;
  } 

  return self;
}

Instance Method Details

#capacityObject

Returns the capacity of this HDFS file system in bytes, raising a DFSException if this was unsuccessful.



571
572
573
574
575
576
577
578
579
580
# File 'ext/hdfs/hdfs.c', line 571

VALUE HDFS_File_System_capacity(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long capacity = hdfsGetCapacity(data->fs);
  if (capacity < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving capacity");
    return Qnil;
  }
  return LONG2NUM(capacity);
}

#cd(path) ⇒ Object

Changes the current working directory to the supplied path. Returns True if successful; raises a DFSException if this fails.



401
402
403
404
405
406
407
408
409
410
411
# File 'ext/hdfs/hdfs.c', line 401

VALUE HDFS_File_System_cd(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsSetWorkingDirectory(data->fs, RSTRING_PTR(path)) < 0) {
    rb_raise(e_dfs_exception,
        "Failed to change current working directory to path: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#chgrp(path, group) ⇒ Object

Changes the group of the supplied path. Returns True if successful; raises a DFSException if this fails.



440
441
442
443
444
445
446
447
448
449
# File 'ext/hdfs/hdfs.c', line 440

VALUE HDFS_File_System_chgrp(VALUE self, VALUE path, VALUE group) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsChown(data->fs, RSTRING_PTR(path), NULL, RSTRING_PTR(group)) < 0) {
    rb_raise(e_dfs_exception, "Failed to chgrp path: %s to group: %s",
        RSTRING_PTR(path), RSTRING_PTR(group));
    return Qnil;
  }
  return Qtrue;
}

#chgrp(path, mode = 644) ⇒ Object

Changes the mode of the supplied path. Returns True if successful; raises a DFSException if this fails.



458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
# File 'ext/hdfs/hdfs.c', line 458

VALUE HDFS_File_System_chmod(int argc, VALUE* argv, VALUE self) {
  VALUE path, mode;
  rb_scan_args(argc, argv, "11", &path, &mode);
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  short hdfs_mode;
  // Sets default mode if none is supplied.
  if (NIL_P(mode)) {
    hdfs_mode = HDFS_DEFAULT_MODE;
  } else {
    hdfs_mode = octal_decimal(NUM2INT(mode));
  }
  if (hdfsChmod(data->fs, RSTRING_PTR(path), hdfs_mode) < 0){
    rb_raise(e_dfs_exception, "Failed to chmod user path: %s to mode: %d",
        RSTRING_PTR(path), hdfs_mode);
    return Qnil;
  }
  return Qtrue;
}

#chown(path, owner) ⇒ Object

Changes the owner of the supplied path. Returns True if successful; raises a DFSException if this fails.



485
486
487
488
489
490
491
492
493
494
# File 'ext/hdfs/hdfs.c', line 485

VALUE HDFS_File_System_chown(VALUE self, VALUE path, VALUE owner) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsChown(data->fs, RSTRING_PTR(path), RSTRING_PTR(owner), NULL) < 0) {
    rb_raise(e_dfs_exception, "Failed to chown user path: %s to user: %s",
        RSTRING_PTR(path), RSTRING_PTR(owner));
    return Qnil;
  }
  return Qtrue;
}

#copy(from_path, to_path, to_fs = nil) ⇒ Object

Copies the file at HDFS location from_path to HDFS location to_path. If to_fs is specified, copies to this HDFS over the current HDFS. If successful, returns True; otherwise, raises a DFSException.



504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
# File 'ext/hdfs/hdfs.c', line 504

VALUE HDFS_File_System_copy(int argc, VALUE* argv, VALUE self) {
  VALUE from_path, to_path, to_fs;
  rb_scan_args(argc, argv, "21", &from_path, &to_path, &to_fs);
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFS destFS = data->fs;
  // If no to_fs is supplied, copies to the current file system.
  if (!NIL_P(to_fs)) {
    if (CLASS_OF(to_fs) == c_file_system) {
      FSData* destFSData = NULL;
      Data_Get_Struct(to_fs, FSData, destFSData);
      destFS = destFSData->fs;
    } else {
      rb_raise(rb_eArgError, "to_fs must be of type Hadoop::DFS::FileSystem");
      return Qnil;
    }
  }
  if (hdfsCopy(data->fs, RSTRING_PTR(from_path), destFS, 
      RSTRING_PTR(to_path)) < 0) {
    rb_raise(e_dfs_exception, "Failed to copy path: %s to path: %s",
        RSTRING_PTR(from_path), RSTRING_PTR(to_path));
    return Qnil;
  }
  return Qtrue;
}

#create_directory(path) ⇒ Object

Creates a directory at the supplied path. If successful, returns True; raises a DFSException if this fails.



300
301
302
303
304
305
306
307
308
309
# File 'ext/hdfs/hdfs.c', line 300

VALUE HDFS_File_System_create_directory(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsCreateDirectory(data->fs, RSTRING_PTR(path)) < 0) {
    rb_raise(e_dfs_exception, "Could not create directory at path: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#cwdObject

Displays the current working directory; raises a DFSException if this fails.



419
420
421
422
423
424
425
426
427
428
429
430
431
# File 'ext/hdfs/hdfs.c', line 419

VALUE HDFS_File_System_cwd(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  char* cur_dir = (char *) malloc(
      sizeof(char) * HDFS_DEFAULT_PATH_STRING_LENGTH);
  if (hdfsGetWorkingDirectory(data->fs, cur_dir,
      HDFS_DEFAULT_PATH_STRING_LENGTH) < 0) {
    free(cur_dir);
    rb_raise(e_dfs_exception, "Failed to get current working directory");
    return Qnil;
  }
  return rb_tainted_str_new2(cur_dir);
}

#default_block_sizeObject

Returns the default block size of this HDFS file system in bytes, raising a DFSException if this was unsuccessful.



589
590
591
592
593
594
595
596
597
598
# File 'ext/hdfs/hdfs.c', line 589

VALUE HDFS_File_System_default_block_size(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long block_size = hdfsGetDefaultBlockSize(data->fs);
  if (block_size < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving default block size");
    return Qnil;
  }
  return LONG2NUM(block_size);
}

#default_block_size_at_path(path) ⇒ Object

Returns the default block size at the supplied HDFS path, raising a DFSException if this was unsuccessful.



607
608
609
610
611
612
613
614
615
616
617
# File 'ext/hdfs/hdfs.c', line 607

VALUE HDFS_File_System_default_block_size_at_path(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long block_size = hdfsGetDefaultBlockSizeAtPath(data->fs, RSTRING_PTR(path));
  if (block_size < 0) {
    rb_raise(e_dfs_exception,
        "Error while retrieving default block size at path: %s", RSTRING_PTR(path));
    return Qnil;
  }
  return LONG2NUM(block_size);
}

#delete(path, recursive = false) ⇒ Object

Deletes the file at the supplied path, recursively if specified. Returns True if successful, raises a DFSException if this fails.



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'ext/hdfs/hdfs.c', line 244

VALUE HDFS_File_System_delete(int argc, VALUE* argv, VALUE self) {
  VALUE path, recursive;
  rb_scan_args(argc, argv, "11", &path, &recursive);
  int hdfs_recursive = HDFS_DEFAULT_RECURSIVE_DELETE;
  if (!NIL_P(recursive)) {
    hdfs_recursive = (recursive == Qtrue) ? 1 : 0;
  }
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsDelete(data->fs, RSTRING_PTR(path), hdfs_recursive) < 0) {
    rb_raise(e_dfs_exception, "Could not delete file at path: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#disconnectnil

Disconnects the client connection.

Returns:

  • (nil)


227
228
229
230
231
232
233
234
235
# File 'ext/hdfs/hdfs.c', line 227

VALUE HDFS_File_System_disconnect(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (data->fs != NULL) {
    hdfsDisconnect(data->fs);
    data->fs = NULL;
  }
  return Qnil;
}

#exist(path) ⇒ Boolean

Checks if a file exists at the supplied path. If file exists, returns True; if not, returns False.

Returns:

  • (Boolean)


286
287
288
289
290
291
# File 'ext/hdfs/hdfs.c', line 286

VALUE HDFS_File_System_exist(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  int success = hdfsExists(data->fs, RSTRING_PTR(path));
  return success == 0 ? Qtrue : Qfalse;
}

#get_hosts(path, start, length) ⇒ Object

Returns the hostnames of the DataNodes which serve the portion of the file between the provided start and length bytes. Raises a DFSException if this fails.



627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
# File 'ext/hdfs/hdfs.c', line 627

VALUE HDFS_File_System_get_hosts(VALUE self, VALUE path, VALUE start,
    VALUE length) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  char*** hosts = hdfsGetHosts(data->fs, RSTRING_PTR(path), NUM2LONG(start),
      NUM2LONG(length));
  if (hosts == NULL) {
    rb_raise(e_dfs_exception,
        "Error while retrieving hosts at path: %s, start: %lu, length: %lu",
        RSTRING_PTR(path), NUM2LONG(start), NUM2LONG(length));
    return Qnil;
  }
  // Builds a Ruby Array object out of the hosts reported by HDFS.
  VALUE hosts_array = rb_ary_new();
  size_t i, j;
  for (i = 0; hosts[i]; i++) {
    VALUE cur_block_hosts = rb_ary_new();
    for (j = 0; hosts[i][j]; j++) {
      rb_ary_push(cur_block_hosts, rb_str_new2(hosts[i][j]));
    }
    rb_ary_push(hosts_array, cur_block_hosts);
  }
  return hosts_array;
}

#list_directory(path) ⇒ Object

Lists the directory at the supplied path, returning an Array of Hadoop::DFS::FileInfo objects. If the directory does not exist, raises a DoesNotExistError.



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'ext/hdfs/hdfs.c', line 319

VALUE HDFS_File_System_list_directory(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  VALUE file_infos = rb_ary_new();
  int num_files = 0;
  hdfsFileInfo* infos = hdfsListDirectory(data->fs, RSTRING_PTR(path), &num_files);

  if (infos == NULL) {
    if (hdfsExists(data->fs, RSTRING_PTR(path)) == 0) {
      return file_infos;
    } else {
      rb_raise(e_does_not_exist, "Directory does not exist: %s", RSTRING_PTR(path));
      return Qnil;
    }
  }

  int i;
  for (i = 0; i < num_files; i++) {
    hdfsFileInfo* cur_info = infos + i;
    rb_ary_push(file_infos, wrap_hdfsFileInfo(cur_info));
  }

  hdfsFreeFileInfo(infos, num_files);
  return file_infos;
}

#move(from_path, to_path, to_fs = nil) ⇒ Object

Moves the file at HDFS location from_path to HDFS location to_path. If to_fs is specified, moves to this HDFS over the current HDFS. If successful, returns true; otherwise, returns false.



538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
# File 'ext/hdfs/hdfs.c', line 538

VALUE HDFS_File_System_move(int argc, VALUE* argv, VALUE self) {
  VALUE from_path, to_path, to_fs;
  rb_scan_args(argc, argv, "21", &from_path, &to_path, &to_fs);
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFS destFS = data->fs;
  // If no to_fs is supplied, moves to the current file system.
  if (!NIL_P(to_fs)) {
    if (CLASS_OF(to_fs) == c_file_system) {
      FSData* destFSData = NULL;
      Data_Get_Struct(to_fs, FSData, destFSData);
      destFS = destFSData->fs;
    } else {
      rb_raise(rb_eArgError, "to_fs must be of type Hadoop::DFS::FileSystem");
      return Qnil;
    }
  }
  if (hdfsMove(data->fs, RSTRING_PTR(from_path), destFS,
      RSTRING_PTR(to_path)) < 0) {
    rb_raise(e_dfs_exception, "Error while moving path: %s to path: %s",
        RSTRING_PTR(from_path), RSTRING_PTR(to_path));
    return Qnil;
  }
  return Qtrue;
}

#open(path, mode = 'r', options = {}) ⇒ File

Opens a file using the supplied mode and options. If the file cannot be opened, raises a CouldNotOpenError; otherwise, returns a Hadoop::DFS::File object corresponding to the file.

options can have the following keys:

  • buffer_size: size in bytes of buffer to use for file accesses (default: default buffer size as configured by HDFS)

  • replication: the number of nodes this file should be replicated against (default: default replication as configured by HDFS)

  • block_size: the HDFS block size in bytes to use for this file (default: default block size as configured by HDFS)

Returns:



721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
# File 'ext/hdfs/hdfs.c', line 721

VALUE HDFS_File_System_open(int argc, VALUE* argv, VALUE self) {
  VALUE path, mode, options;
  int flags = O_RDONLY;
  rb_scan_args(argc, argv, "12", &path, &mode, &options);
  // Sets file open mode if one is provided by the user.
  if (!NIL_P(mode)) {
    if (strcmp("r", StringValuePtr(mode)) == 0) {
      flags = O_RDONLY;
    } else if (strcmp("w", StringValuePtr(mode)) == 0) {
      flags = O_WRONLY;
    } else {
      rb_raise(rb_eArgError, "Mode must be 'r' or 'w'");
      return Qnil;
    }
  }
  options = NIL_P(options) ? rb_hash_new() : options;
  VALUE r_buffer_size = rb_hash_aref(options, rb_eval_string(":buffer_size"));
  VALUE r_replication = rb_hash_aref(options, rb_eval_string(":replication"));
  VALUE r_block_size = rb_hash_aref(options, rb_eval_string(":block_size"));
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFile file = hdfsOpenFile(data->fs, RSTRING_PTR(path), flags,
      RTEST(r_buffer_size) ? NUM2INT(r_buffer_size) : 0,
      RTEST(r_replication) ? NUM2INT(r_replication) : 0,
      RTEST(r_block_size) ? NUM2INT(r_block_size) : 0);
  if (file == NULL) {
    rb_raise(e_could_not_open, "Could not open file %s", RSTRING_PTR(path));
    return Qnil;
  }
  FileData* file_data = ALLOC_N(FileData, 1);
  file_data->fs = data->fs;
  file_data->file = file;
  VALUE file_instance = Data_Wrap_Struct(c_file, NULL, free_file_data, file_data);
  return file_instance;
}

#rename(from_path, to_path) ⇒ Object

Renames the file at the supplied path to the file at the destination path. Returns True if successful, raises a DFSException if this fails.



268
269
270
271
272
273
274
275
276
277
# File 'ext/hdfs/hdfs.c', line 268

VALUE HDFS_File_System_rename(VALUE self, VALUE from_path, VALUE to_path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsRename(data->fs, RSTRING_PTR(from_path), RSTRING_PTR(to_path)) < 0) {
    rb_raise(e_dfs_exception, "Could not rename path: %s to path: %s",
        RSTRING_PTR(from_path), RSTRING_PTR(to_path));
    return Qnil;
  }
  return Qtrue;
}

#set_replication(path, replication = 3) ⇒ Object

Sets the replication of the following path to the supplied number of nodes it will be replicated against. Returns True if successful; raises a DFSException if this fails.



374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# File 'ext/hdfs/hdfs.c', line 374

VALUE HDFS_File_System_set_replication(int argc, VALUE* argv, VALUE self) {
  VALUE path, replication;
  rb_scan_args(argc, argv, "11", &path, &replication);
  int hdfs_replication;
  // If no replication value is supplied, uses default replication value.
  if (NIL_P(replication)) {
    hdfs_replication = HDFS_DEFAULT_REPLICATION;
  } else {
    hdfs_replication = NUM2INT(replication);
  }
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsSetReplication(data->fs, RSTRING_PTR(path), hdfs_replication) < 0) {
    rb_raise(e_dfs_exception, "Failed to set replication to: %d at path: %s",
        hdfs_replication, RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#stat(path) ⇒ Object

Stats the file or directory at the supplied path, returning a Hadoop::DFS:FileInfo object corresponding to it. If the file or directory does not exist, raises a DoesNotExistError.



353
354
355
356
357
358
359
360
361
362
363
364
# File 'ext/hdfs/hdfs.c', line 353

VALUE HDFS_File_System_stat(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFileInfo* info = hdfsGetPathInfo(data->fs, RSTRING_PTR(path));
  if (info == NULL) {
    rb_raise(e_does_not_exist, "File does not exist: %s", RSTRING_PTR(path));
    return Qnil;
  }
  VALUE file_info = wrap_hdfsFileInfo(info);
  hdfsFreeFileInfo(info, 1);
  return file_info;
}

#usedObject

Returns the bytes currently in use by this filesystem, raising a DFSException if unsuccessful.



659
660
661
662
663
664
665
666
667
668
# File 'ext/hdfs/hdfs.c', line 659

VALUE HDFS_File_System_used(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long used = hdfsGetUsed(data->fs);
  if (used < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving used capacity");
    return Qnil;
  }
  return LONG2NUM(used);
}

#utime(path, modified_time = nil, access_time = nil) ⇒ Object

Changes the last modified and/or last access time in seconds since the Unix epoch for the supplied file. Returns true if successful; false if not.



677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
# File 'ext/hdfs/hdfs.c', line 677

VALUE HDFS_File_System_utime(int argc, VALUE* argv, VALUE self) {
  VALUE path, modified_time, access_time;
  tTime hdfs_modified_time, hdfs_access_time;
  rb_scan_args(argc, argv, "12", &path, &modified_time, &access_time);
  // Sets default values for last modified and/or last access time.
  if (NIL_P(modified_time)) {
    hdfs_modified_time = -1;
  } else {
    hdfs_modified_time = NUM2LONG(modified_time);
  }
  if (NIL_P(access_time)) {
    hdfs_access_time = -1;
  } else {
    hdfs_access_time = NUM2LONG(access_time);
  }
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsUtime(data->fs, RSTRING_PTR(path), hdfs_modified_time,
      hdfs_access_time) < 0) {
    rb_raise(e_dfs_exception,
        "Error while setting modified time: %lu, access time: %lu at path: %s",
        (long) hdfs_modified_time, (long) hdfs_access_time, RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}