Class: Hadoop::DFS::FileSystem

Inherits:
Object
  • Object
show all
Defined in:
ext/hdfs/hdfs.c

Instance Method Summary collapse

Constructor Details

#new(options = {}) ⇒ Object

Creates a new HDFS client connection, configured by options, returning a new Hadoop::DFS::FileSystem object if successful. If this fails, raises a ConnectError.

options can have the following keys:

  • local: whether to use the local filesystem instead of HDFS

    (default: false)
    
  • host: hostname or IP address of a Hadoop NameNode (default: ‘0.0.0.0’)

  • port: port through which to connect to Hadoop NameNode (default: 8020)

  • user: user to connect to filesystem as (default: current user)



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'ext/hdfs/hdfs.c', line 178

VALUE HDFS_File_System_initialize(int argc, VALUE* argv, VALUE self) {
  VALUE options;
  rb_scan_args(argc, argv, "01", &options);

  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);

  if (NIL_P(options)) {
    options = rb_hash_new();
  }

  VALUE r_user = rb_hash_aref(options, rb_eval_string(":user"));
  char* hdfs_user = RTEST(r_user) ? RSTRING_PTR(r_user) : 
      (char*) HDFS_DEFAULT_USER;

  VALUE r_local = rb_hash_aref(options, rb_eval_string(":local"));
  if (r_local == Qtrue) {
    data->fs = hdfsConnectAsUser(NULL, 0, hdfs_user);
  } else {
    VALUE r_host = rb_hash_aref(options, rb_eval_string(":host"));
    VALUE r_port = rb_hash_aref(options, rb_eval_string(":port"));

    // Sets default values for host and port if not supplied by user.
    char* hdfs_host = RTEST(r_host) ? RSTRING_PTR(r_host) : 
        (char*) HDFS_DEFAULT_HOST;
    int hdfs_port   = RTEST(r_port) ? NUM2INT(r_port) :
        HDFS_DEFAULT_PORT;
    data->fs = hdfsConnectAsUser(hdfs_host, hdfs_port, hdfs_user);     
  }
 
  if (data->fs == NULL) {
    rb_raise(e_connect_error, "Failed to connect to HDFS");
    return Qnil;
  } 

  return self;
}

Instance Method Details

#capacityObject

Returns the capacity of this HDFS file system in bytes, raising a DFSException if this was unsuccessful.



561
562
563
564
565
566
567
568
569
570
# File 'ext/hdfs/hdfs.c', line 561

VALUE HDFS_File_System_capacity(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long capacity = hdfsGetCapacity(data->fs);
  if (capacity < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving capacity");
    return Qnil;
  }
  return LONG2NUM(capacity);
}

#cd(path) ⇒ Object

Changes the current working directory to the supplied path. Returns True if successful; raises a DFSException if this fails.



391
392
393
394
395
396
397
398
399
400
401
# File 'ext/hdfs/hdfs.c', line 391

VALUE HDFS_File_System_cd(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsSetWorkingDirectory(data->fs, RSTRING_PTR(path)) < 0) {
    rb_raise(e_dfs_exception,
        "Failed to change current working directory to path: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#chgrp(path, group) ⇒ Object

Changes the group of the supplied path. Returns True if successful; raises a DFSException if this fails.



430
431
432
433
434
435
436
437
438
439
# File 'ext/hdfs/hdfs.c', line 430

VALUE HDFS_File_System_chgrp(VALUE self, VALUE path, VALUE group) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsChown(data->fs, RSTRING_PTR(path), NULL, RSTRING_PTR(group)) < 0) {
    rb_raise(e_dfs_exception, "Failed to chgrp path: %s to group: %s",
        RSTRING_PTR(path), RSTRING_PTR(group));
    return Qnil;
  }
  return Qtrue;
}

#chgrp(path, mode = 644) ⇒ Object

Changes the mode of the supplied path. Returns True if successful; raises a DFSException if this fails.



448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# File 'ext/hdfs/hdfs.c', line 448

VALUE HDFS_File_System_chmod(int argc, VALUE* argv, VALUE self) {
  VALUE path, mode;
  rb_scan_args(argc, argv, "11", &path, &mode);
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  short hdfs_mode;
  // Sets default mode if none is supplied.
  if (NIL_P(mode)) {
    hdfs_mode = HDFS_DEFAULT_MODE;
  } else {
    hdfs_mode = octal_decimal(NUM2INT(mode));
  }
  if (hdfsChmod(data->fs, RSTRING_PTR(path), hdfs_mode) < 0){
    rb_raise(e_dfs_exception, "Failed to chmod user path: %s to mode: %d",
        RSTRING_PTR(path), hdfs_mode);
    return Qnil;
  }
  return Qtrue;
}

#chown(path, owner) ⇒ Object

Changes the owner of the supplied path. Returns True if successful; raises a DFSException if this fails.



475
476
477
478
479
480
481
482
483
484
# File 'ext/hdfs/hdfs.c', line 475

VALUE HDFS_File_System_chown(VALUE self, VALUE path, VALUE owner) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsChown(data->fs, RSTRING_PTR(path), RSTRING_PTR(owner), NULL) < 0) {
    rb_raise(e_dfs_exception, "Failed to chown user path: %s to user: %s",
        RSTRING_PTR(path), RSTRING_PTR(owner));
    return Qnil;
  }
  return Qtrue;
}

#copy(from_path, to_path, to_fs = nil) ⇒ Object

Copies the file at HDFS location from_path to HDFS location to_path. If to_fs is specified, copies to this HDFS over the current HDFS. If successful, returns True; otherwise, raises a DFSException.



494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
# File 'ext/hdfs/hdfs.c', line 494

VALUE HDFS_File_System_copy(int argc, VALUE* argv, VALUE self) {
  VALUE from_path, to_path, to_fs;
  rb_scan_args(argc, argv, "21", &from_path, &to_path, &to_fs);
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFS destFS = data->fs;
  // If no to_fs is supplied, copies to the current file system.
  if (!NIL_P(to_fs)) {
    if (CLASS_OF(to_fs) == c_file_system) {
      FSData* destFSData = NULL;
      Data_Get_Struct(to_fs, FSData, destFSData);
      destFS = destFSData->fs;
    } else {
      rb_raise(rb_eArgError, "to_fs must be of type Hadoop::DFS::FileSystem");
      return Qnil;
    }
  }
  if (hdfsCopy(data->fs, RSTRING_PTR(from_path), destFS, 
      RSTRING_PTR(to_path)) < 0) {
    rb_raise(e_dfs_exception, "Failed to copy path: %s to path: %s",
        RSTRING_PTR(from_path), RSTRING_PTR(to_path));
    return Qnil;
  }
  return Qtrue;
}

#create_directory(path) ⇒ Object

Creates a directory at the supplied path. If successful, returns True; raises a DFSException if this fails.



295
296
297
298
299
300
301
302
303
304
# File 'ext/hdfs/hdfs.c', line 295

VALUE HDFS_File_System_create_directory(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsCreateDirectory(data->fs, RSTRING_PTR(path)) < 0) {
    rb_raise(e_dfs_exception, "Could not create directory at path: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#cwdObject

Displays the current working directory; raises a DFSException if this fails.



409
410
411
412
413
414
415
416
417
418
419
420
421
# File 'ext/hdfs/hdfs.c', line 409

VALUE HDFS_File_System_cwd(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  char* cur_dir = (char *) malloc(
      sizeof(char) * HDFS_DEFAULT_PATH_STRING_LENGTH);
  if (hdfsGetWorkingDirectory(data->fs, cur_dir,
      HDFS_DEFAULT_PATH_STRING_LENGTH) < 0) {
    free(cur_dir);
    rb_raise(e_dfs_exception, "Failed to get current working directory");
    return Qnil;
  }
  return rb_str_new2(cur_dir);
}

#default_block_sizeObject

Returns the default block size of this HDFS file system in bytes, raising a DFSException if this was unsuccessful.



579
580
581
582
583
584
585
586
587
588
# File 'ext/hdfs/hdfs.c', line 579

VALUE HDFS_File_System_default_block_size(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long block_size = hdfsGetDefaultBlockSize(data->fs);
  if (block_size < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving default block size");
    return Qnil;
  }
  return LONG2NUM(block_size);
}

#default_block_size_at_path(path) ⇒ Object

Returns the default block size at the supplied HDFS path, raising a DFSException if this was unsuccessful.



597
598
599
600
601
602
603
604
605
606
607
# File 'ext/hdfs/hdfs.c', line 597

VALUE HDFS_File_System_default_block_size_at_path(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long block_size = hdfsGetDefaultBlockSizeAtPath(data->fs, RSTRING_PTR(path));
  if (block_size < 0) {
    rb_raise(e_dfs_exception,
        "Error while retrieving default block size at path: %s", RSTRING_PTR(path));
    return Qnil;
  }
  return LONG2NUM(block_size);
}

#delete(path, recursive = false) ⇒ Object

Deletes the file at the supplied path, recursively if specified. Returns True if successful, raises a DFSException if this fails.



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'ext/hdfs/hdfs.c', line 239

VALUE HDFS_File_System_delete(int argc, VALUE* argv, VALUE self) {
  VALUE path, recursive;
  rb_scan_args(argc, argv, "11", &path, &recursive);
  int hdfs_recursive = HDFS_DEFAULT_RECURSIVE_DELETE;
  if (!NIL_P(recursive)) {
    hdfs_recursive = (recursive == Qtrue) ? 1 : 0;
  }
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsDelete(data->fs, RSTRING_PTR(path), hdfs_recursive) < 0) {
    rb_raise(e_dfs_exception, "Could not delete file at path: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#disconnectnil

Disconnects the client connection.

Returns:



222
223
224
225
226
227
228
229
230
# File 'ext/hdfs/hdfs.c', line 222

VALUE HDFS_File_System_disconnect(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (data->fs != NULL) {
    hdfsDisconnect(data->fs);
    data->fs = NULL;
  }
  return Qnil;
}

#exist(path) ⇒ Boolean

Checks if a file exists at the supplied path. If file exists, returns True; if not, returns False.

Returns:



281
282
283
284
285
286
# File 'ext/hdfs/hdfs.c', line 281

VALUE HDFS_File_System_exist(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  int success = hdfsExists(data->fs, RSTRING_PTR(path));
  return success == 0 ? Qtrue : Qfalse;
}

#get_hosts(path, start, length) ⇒ Object

Returns the hostnames of the DataNodes which serve the portion of the file between the provided start and length bytes. Raises a DFSException if this fails.



617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
# File 'ext/hdfs/hdfs.c', line 617

VALUE HDFS_File_System_get_hosts(VALUE self, VALUE path, VALUE start,
    VALUE length) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  char*** hosts = hdfsGetHosts(data->fs, RSTRING_PTR(path), NUM2LONG(start),
      NUM2LONG(length));
  if (hosts == NULL) {
    rb_raise(e_dfs_exception,
        "Error while retrieving hosts at path: %s, start: %lu, length: %lu",
        RSTRING_PTR(path), NUM2LONG(start), NUM2LONG(length));
    return Qnil;
  }
  // Builds a Ruby Array object out of the hosts reported by HDFS.
  VALUE hosts_array = rb_ary_new();
  size_t i, j;
  for (i = 0; hosts[i]; i++) {
    VALUE cur_block_hosts = rb_ary_new();
    for (j = 0; hosts[i][j]; j++) {
      rb_ary_push(cur_block_hosts, rb_str_new2(hosts[i][j]));
    }
    rb_ary_push(hosts_array, cur_block_hosts);
  }
  return hosts_array;
}

#list_directory(path) ⇒ Object

Lists the directory at the supplied path, returning an Array of Hadoop::DFS::FileInfo objects. If the directory does not exist, raises a DoesNotExistError.



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'ext/hdfs/hdfs.c', line 314

VALUE HDFS_File_System_list_directory(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  VALUE file_infos = rb_ary_new();
  int num_files = 0;
  hdfsFileInfo* infos = hdfsListDirectory(data->fs, RSTRING_PTR(path),
      &num_files);
  if (infos == NULL) {
    rb_raise(e_does_not_exist, "Directory does not exist: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  int i;
  for (i = 0; i < num_files; i++) {
    hdfsFileInfo* cur_info = infos + i;
    rb_ary_push(file_infos, wrap_hdfsFileInfo(cur_info));
  }
  hdfsFreeFileInfo(infos, num_files);
  return file_infos;
}

#move(from_path, to_path, to_fs = nil) ⇒ Object

Moves the file at HDFS location from_path to HDFS location to_path. If to_fs is specified, moves to this HDFS over the current HDFS. If successful, returns true; otherwise, returns false.



528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
# File 'ext/hdfs/hdfs.c', line 528

VALUE HDFS_File_System_move(int argc, VALUE* argv, VALUE self) {
  VALUE from_path, to_path, to_fs;
  rb_scan_args(argc, argv, "21", &from_path, &to_path, &to_fs);
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFS destFS = data->fs;
  // If no to_fs is supplied, moves to the current file system.
  if (!NIL_P(to_fs)) {
    if (CLASS_OF(to_fs) == c_file_system) {
      FSData* destFSData = NULL;
      Data_Get_Struct(to_fs, FSData, destFSData);
      destFS = destFSData->fs;
    } else {
      rb_raise(rb_eArgError, "to_fs must be of type Hadoop::DFS::FileSystem");
      return Qnil;
    }
  }
  if (hdfsMove(data->fs, RSTRING_PTR(from_path), destFS,
      RSTRING_PTR(to_path)) < 0) {
    rb_raise(e_dfs_exception, "Error while moving path: %s to path: %s",
        RSTRING_PTR(from_path), RSTRING_PTR(to_path));
    return Qnil;
  }
  return Qtrue;
}

#open(path, mode = 'r', options = {}) ⇒ File

Opens a file using the supplied mode and options. If the file cannot be opened, raises a CouldNotOpenError; otherwise, returns a Hadoop::DFS::File object corresponding to the file.

options can have the following keys:

  • buffer_size: size in bytes of buffer to use for file accesses (default: default buffer size as configured by HDFS)

  • replication: the number of nodes this file should be replicated against (default: default replication as configured by HDFS)

  • block_size: the HDFS block size in bytes to use for this file (default: default block size as configured by HDFS)

Returns:



711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
# File 'ext/hdfs/hdfs.c', line 711

VALUE HDFS_File_System_open(int argc, VALUE* argv, VALUE self) {
  VALUE path, mode, options;
  int flags = O_RDONLY;
  rb_scan_args(argc, argv, "12", &path, &mode, &options);
  // Sets file open mode if one is provided by the user.
  if (!NIL_P(mode)) {
    if (strcmp("r", StringValuePtr(mode)) == 0) {
      flags = O_RDONLY;
    } else if (strcmp("w", StringValuePtr(mode)) == 0) {
      flags = O_WRONLY;
    } else {
      rb_raise(rb_eArgError, "Mode must be 'r' or 'w'");
      return Qnil;
    }
  }
  if (NIL_P(options)) {
    options = rb_hash_new();
  }
  VALUE r_buffer_size = rb_hash_aref(options, rb_eval_string(":buffer_size"));
  VALUE r_replication = rb_hash_aref(options, rb_eval_string(":replication"));
  VALUE r_block_size = rb_hash_aref(options, rb_eval_string(":block_size"));
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFile file = hdfsOpenFile(data->fs, RSTRING_PTR(path), flags, 
    RTEST(r_buffer_size) ? NUM2INT(r_buffer_size) : 0, 
    RTEST(r_replication) ? NUM2INT(r_replication) : 0, 
    RTEST(r_block_size) ? NUM2INT(r_block_size) : 0);
  if (file == NULL) {
    rb_raise(e_could_not_open, "Could not open file %s", RSTRING_PTR(path));
    return Qnil;
  }
  FileData* file_data = ALLOC_N(FileData, 1);
  file_data->fs = data->fs;
  file_data->file = file;
  VALUE file_instance = Data_Wrap_Struct(c_file, NULL, free_file_data, file_data);
  return file_instance;
}

#rename(from_path, to_path) ⇒ Object

Renames the file at the supplied path to the file at the destination path. Returns True if successful, raises a DFSException if this fails.



263
264
265
266
267
268
269
270
271
272
# File 'ext/hdfs/hdfs.c', line 263

VALUE HDFS_File_System_rename(VALUE self, VALUE from_path, VALUE to_path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsRename(data->fs, RSTRING_PTR(from_path), RSTRING_PTR(to_path)) < 0) {
    rb_raise(e_dfs_exception, "Could not rename path: %s to path: %s",
        RSTRING_PTR(from_path), RSTRING_PTR(to_path));
    return Qnil;
  }
  return Qtrue;
}

#set_replication(path, replication = 3) ⇒ Object

Sets the replication of the following path to the supplied number of nodes it will be replicated against. Returns True if successful; raises a DFSException if this fails.



364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'ext/hdfs/hdfs.c', line 364

VALUE HDFS_File_System_set_replication(int argc, VALUE* argv, VALUE self) {
  VALUE path, replication;
  rb_scan_args(argc, argv, "11", &path, &replication);
  int hdfs_replication;
  // If no replication value is supplied, uses default replication value.
  if (NIL_P(replication)) {
    hdfs_replication = HDFS_DEFAULT_REPLICATION;
  } else {
    hdfs_replication = NUM2INT(replication);
  }
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsSetReplication(data->fs, RSTRING_PTR(path), hdfs_replication) < 0) {
    rb_raise(e_dfs_exception, "Failed to set replication to: %d at path: %s",
        hdfs_replication, RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#stat(path) ⇒ Object

Stats the file or directory at the supplied path, returning a Hadoop::DFS:FileInfo object corresponding to it. If the file or directory does not exist, raises a DoesNotExistError.



343
344
345
346
347
348
349
350
351
352
353
354
# File 'ext/hdfs/hdfs.c', line 343

VALUE HDFS_File_System_stat(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFileInfo* info = hdfsGetPathInfo(data->fs, RSTRING_PTR(path));
  if (info == NULL) {
    rb_raise(e_does_not_exist, "File does not exist: %s", RSTRING_PTR(path));
    return Qnil;
  }
  VALUE file_info = wrap_hdfsFileInfo(info);
  hdfsFreeFileInfo(info, 1);
  return file_info;
}

#usedObject

Returns the bytes currently in use by this filesystem, raising a DFSException if unsuccessful.



649
650
651
652
653
654
655
656
657
658
# File 'ext/hdfs/hdfs.c', line 649

VALUE HDFS_File_System_used(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long used = hdfsGetUsed(data->fs);
  if (used < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving used capacity");
    return Qnil;
  }
  return LONG2NUM(used);
}

#utime(path, modified_time = nil, access_time = nil) ⇒ Object

Changes the last modified and/or last access time in seconds since the Unix epoch for the supplied file. Returns true if successful; false if not.



667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
# File 'ext/hdfs/hdfs.c', line 667

VALUE HDFS_File_System_utime(int argc, VALUE* argv, VALUE self) {
  VALUE path, modified_time, access_time;
  tTime hdfs_modified_time, hdfs_access_time;
  rb_scan_args(argc, argv, "12", &path, &modified_time, &access_time);
  // Sets default values for last modified and/or last access time.
  if (NIL_P(modified_time)) {
    hdfs_modified_time = -1;
  } else {
    hdfs_modified_time = NUM2LONG(modified_time);
  }
  if (NIL_P(access_time)) {
    hdfs_access_time = -1;
  } else {
    hdfs_access_time = NUM2LONG(access_time);
  }
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsUtime(data->fs, RSTRING_PTR(path), hdfs_modified_time,
      hdfs_access_time) < 0) {
    rb_raise(e_dfs_exception,
        "Error while setting modified time: %lu, access time: %lu at path: %s",
        (long) hdfs_modified_time, (long) hdfs_access_time, RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}