Class: Hadoop::DFS::FileSystem

Inherits:
Object
  • Object
show all
Defined in:
ext/hdfs/hdfs.c

Instance Method Summary collapse

Constructor Details

#new(options = {}) ⇒ Object

Creates a new HDFS client connection, returning a new Hadoop::DFS::FileSystem object if successful. If this fails, raises a ConnectError.

Options:

:local
:host
:port


169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'ext/hdfs/hdfs.c', line 169

VALUE HDFS_File_System_initialize(int argc, VALUE* argv, VALUE self) {
  VALUE options;
  rb_scan_args(argc, argv, "01", &options);

  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);

  if (NIL_P(options)) {
    options = rb_hash_new();
  }

  VALUE r_local = rb_hash_aref(options, rb_eval_string(":local"));
  if (r_local == Qtrue) {
    data->fs = hdfsConnect(NULL, 0);
  } else {
    VALUE r_host = rb_hash_aref(options, rb_eval_string(":host"));
    VALUE r_port = rb_hash_aref(options, rb_eval_string(":port"));

    // Sets default values for host and port if not supplied by user.
    char* hdfs_host = (char*) HDFS_DEFAULT_HOST;
    int hdfs_port   = HDFS_DEFAULT_PORT;

    if (RTEST(r_host)) {
      hdfs_host = RSTRING_PTR(r_host);
    }

    if (RTEST(r_port)) {
      hdfs_port = NUM2INT(r_port);
    }

    data->fs = hdfsConnect(hdfs_host, hdfs_port);     
  }
 
  if (data->fs == NULL) {
    rb_raise(e_connect_error, "Failed to connect to HDFS");
    return Qnil;
  } 

  return self;
}

Instance Method Details

#capacityObject

Returns the capacity of this HDFS file system in bytes, raising a DFSException if this was unsuccessful.



554
555
556
557
558
559
560
561
562
563
# File 'ext/hdfs/hdfs.c', line 554

VALUE HDFS_File_System_capacity(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long capacity = hdfsGetCapacity(data->fs);
  if (capacity < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving capacity");
    return Qnil;
  }
  return LONG2NUM(capacity);
}

#cd(path) ⇒ Object

Changes the current working directory to the supplied path. Returns True if successful; raises a DFSException if this fails.



385
386
387
388
389
390
391
392
393
394
395
# File 'ext/hdfs/hdfs.c', line 385

VALUE HDFS_File_System_cd(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsSetWorkingDirectory(data->fs, RSTRING_PTR(path)) < 0) {
    rb_raise(e_dfs_exception,
        "Failed to change current working directory to path: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#chgrp(path, group) ⇒ Object

Changes the group of the supplied path. Returns True if successful; raises a DFSException if this fails.



424
425
426
427
428
429
430
431
432
433
# File 'ext/hdfs/hdfs.c', line 424

VALUE HDFS_File_System_chgrp(VALUE self, VALUE path, VALUE group) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsChgrp(data->fs, RSTRING_PTR(path), NULL, RSTRING_PTR(group)) < 0) {
    rb_raise(e_dfs_exception, "Failed to chgrp path: %s to group: %s",
        RSTRING_PTR(path), RSTRING_PTR(group));
    return Qnil;
  }
  return Qtrue;
}

#chgrp(path, mode = 644) ⇒ Object

Changes the mode of the supplied path. Returns True if successful; raises a DFSException if this fails.



442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
# File 'ext/hdfs/hdfs.c', line 442

VALUE HDFS_File_System_chmod(int argc, VALUE* argv, VALUE self) {
  VALUE path, mode;
  rb_scan_args(argc, argv, "11", &path, &mode);
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  short hdfs_mode;
  // Sets default mode if none is supplied.
  if (NIL_P(mode)) {
    hdfs_mode = HDFS_DEFAULT_MODE;
  } else {
    hdfs_mode = octal_decimal(NUM2INT(mode));
  }
  if (hdfsChmod(data->fs, RSTRING_PTR(path), hdfs_mode) < 0){
    rb_raise(e_dfs_exception, "Failed to chmod user path: %s to mode: %d",
        RSTRING_PTR(path), hdfs_mode);
    return Qnil;
  }
  return Qtrue;
}

#chown(path, owner) ⇒ Object

Changes the owner of the supplied path. Returns True if successful; raises a DFSException if this fails.



469
470
471
472
473
474
475
476
477
478
# File 'ext/hdfs/hdfs.c', line 469

VALUE HDFS_File_System_chown(VALUE self, VALUE path, VALUE owner) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsChown(data->fs, RSTRING_PTR(path), RSTRING_PTR(owner), NULL) < 0) {
    rb_raise(e_dfs_exception, "Failed to chown user path: %s to user: %s",
        RSTRING_PTR(path), RSTRING_PTR(owner));
    return Qnil;
  }
  return Qtrue;
}

#copy(from_path, to_path, to_fs = nil) ⇒ Object

Copies the file at HDFS location from_path to HDFS location to_path. If to_fs is specified, copies to this HDFS over the current HDFS. If successful, returns True; otherwise, raises a DFSException.



488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
# File 'ext/hdfs/hdfs.c', line 488

VALUE HDFS_File_System_copy(int argc, VALUE* argv, VALUE self) {
  VALUE from_path, to_path, to_fs;
  rb_scan_args(argc, argv, "21", &from_path, &to_path, &to_fs);
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFS destFS = data->fs;
  // If no to_fs is supplied, copies to the current file system.
  if (!NIL_P(to_fs)) {
    if (CLASS_OF(to_fs) == c_file_system) {
      FSData* destFSData = NULL;
      Data_Get_Struct(to_fs, FSData, destFSData);
      destFS = destFSData->fs;
    } else {
      rb_raise(rb_eArgError, "to_fs must be of type Hadoop::DFS::FileSystem");
      return Qnil;
    }
  }
  if (hdfsCopy(data->fs, RSTRING_PTR(from_path), destFS, 
      RSTRING_PTR(to_path)) < 0) {
    rb_raise(e_dfs_exception, "Failed to copy path: %s to path: %s",
        RSTRING_PTR(from_path), RSTRING_PTR(to_path));
    return Qnil;
  }
  return Qtrue;
}

#create_directory(path) ⇒ Object

Creates a directory at the supplied path. If successful, returns True; raises a DFSException if this fails.



289
290
291
292
293
294
295
296
297
298
# File 'ext/hdfs/hdfs.c', line 289

VALUE HDFS_File_System_create_directory(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsCreateDirectory(data->fs, RSTRING_PTR(path)) < 0) {
    rb_raise(e_dfs_exception, "Could not create directory at path: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#cwdObject

Displays the current working directory; raises a DFSException if this fails.



403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'ext/hdfs/hdfs.c', line 403

VALUE HDFS_File_System_cwd(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  char* cur_dir = (char *) malloc(
      sizeof(char) * HDFS_DEFAULT_PATH_STRING_LENGTH);
  if (hdfsGetWorkingDirectory(data->fs, cur_dir,
      HDFS_DEFAULT_PATH_STRING_LENGTH) < 0) {
    free(cur_dir);
    rb_raise(e_dfs_exception, "Failed to get current working directory");
    return Qnil;
  }
  return rb_str_new2(cur_dir);
}

#capacityObject

Returns the default block size of this HDFS file system in bytes, raising a DFSException if this was unsuccessful.



572
573
574
575
576
577
578
579
580
581
# File 'ext/hdfs/hdfs.c', line 572

VALUE HDFS_File_System_default_block_size(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long block_size = hdfsGetDefaultBlockSize(data->fs);
  if (block_size < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving default block size");
    return Qnil;
  }
  return LONG2NUM(block_size);
}

#default_block_size_at_path(path) ⇒ Object

Returns the default block size at the supplied HDFS path, raising a DFSException if this was unsuccessful.



590
591
592
593
594
595
596
597
598
599
600
# File 'ext/hdfs/hdfs.c', line 590

VALUE HDFS_File_System_default_block_size_at_path(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long block_size = hdfsGetDefaultBlockSizeAtPath(data->fs, RSTRING_PTR(path));
  if (block_size < 0) {
    rb_raise(e_dfs_exception,
        "Error while retrieving default block size at path: %s", RSTRING_PTR(path));
    return Qnil;
  }
  return LONG2NUM(block_size);
}

#delete(path, recursive = false) ⇒ Object

Deletes the file at the supplied path, recursively if specified. Returns True if successful, raises a DFSException if this fails.



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'ext/hdfs/hdfs.c', line 233

VALUE HDFS_File_System_delete(int argc, VALUE* argv, VALUE self) {
  VALUE path, recursive;
  rb_scan_args(argc, argv, "11", &path, &recursive);
  int hdfs_recursive = HDFS_DEFAULT_RECURSIVE_DELETE;
  if (!NIL_P(recursive)) {
    hdfs_recursive = (recursive == Qtrue) ? 1 : 0;
  }
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsDelete(data->fs, RSTRING_PTR(path), hdfs_recursive) < 0) {
    rb_raise(e_dfs_exception, "Could not delete file at path: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#disconnectnil

Disconnects the client connection.

Returns:

  • (nil)


216
217
218
219
220
221
222
223
224
# File 'ext/hdfs/hdfs.c', line 216

VALUE HDFS_File_System_disconnect(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (data->fs != NULL) {
    hdfsDisconnect(data->fs);
    data->fs = NULL;
  }
  return Qnil;
}

#exist(path) ⇒ Boolean

Checks if a file exists at the supplied path. If file exists, returns True; if not, returns False.

Returns:

  • (Boolean)


275
276
277
278
279
280
# File 'ext/hdfs/hdfs.c', line 275

VALUE HDFS_File_System_exist(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  int success = hdfsExists(data->fs, RSTRING_PTR(path));
  return success == 0 ? Qtrue : Qfalse;
}

#list_directory(path) ⇒ Object

Lists the directory at the supplied path, returning an Array of Hadoop::DFS::FileInfo objects. If the directory does not exist, raises a DoesNotExistError.



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'ext/hdfs/hdfs.c', line 308

VALUE HDFS_File_System_list_directory(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  VALUE file_infos = rb_ary_new();
  int num_files = 0;
  hdfsFileInfo* infos = hdfsListDirectory(data->fs, RSTRING_PTR(path),
      &num_files);
  if (infos == NULL) {
    rb_raise(e_does_not_exist, "Directory does not exist: %s",
        RSTRING_PTR(path));
    return Qnil;
  }
  int i;
  for (i = 0; i < num_files; i++) {
    hdfsFileInfo* cur_info = infos + i;
    rb_ary_push(file_infos, wrap_hdfsFileInfo(cur_info));
  }
  hdfsFreeFileInfo(infos, num_files);
  return file_infos;
}

#move(from_path, to_path, to_fs = nil) ⇒ Object

Moves the file at HDFS location from_path to HDFS location to_path. If to_fs is specified, moves to this HDFS over the current HDFS. If successful, returns true; otherwise, returns false.



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
# File 'ext/hdfs/hdfs.c', line 522

VALUE HDFS_File_System_move(int argc, VALUE* argv, VALUE self) {
  VALUE from_path, to_path, to_fs;
  rb_scan_args(argc, argv, "21", &from_path, &to_path, &to_fs);
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFS destFS = data->fs;
  // If no to_fs is supplied, moves to the current file system.
  if (!NIL_P(to_fs)) {
    if (CLASS_OF(to_fs) == c_file_system) {
      FSData* destFSData = NULL;
      Data_Get_Struct(to_fs, FSData, destFSData);
      destFS = destFSData->fs;
    } else {
      rb_raise(rb_eArgError, "to_fs must be of type Hadoop::DFS::FileSystem");
      return Qnil;
    }
  }
  if (hdfsMove(data->fs, RSTRING_PTR(from_path), destFS,
      RSTRING_PTR(to_path)) < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving capacity");
    return Qnil;
  }
  return Qtrue;
}

#open(path, mode = 'r', options = {}) ⇒ File

Opens a file. If the file cannot be opened, raises a CouldNotOpenError; otherwise, returns a Hadoop::DFS::File object corresponding to the file.

Returns:



661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
# File 'ext/hdfs/hdfs.c', line 661

VALUE HDFS_File_System_open(int argc, VALUE* argv, VALUE self) {
  VALUE path, mode, options;
  int flags = O_RDONLY;
  rb_scan_args(argc, argv, "12", &path, &mode, &options);
  // Sets file open mode if one is provided by the user.
  if (!NIL_P(mode)) {
    if (strcmp("r", StringValuePtr(mode)) == 0) {
      flags = O_RDONLY;
    } else if (strcmp("w", StringValuePtr(mode)) == 0) {
      flags = O_WRONLY;
    } else {
      rb_raise(rb_eArgError, "Mode must be 'r' or 'w'");
      return Qnil;
    }
  }
  if (NIL_P(options)) {
    options = rb_hash_new();
  }
  VALUE r_buffer_size = rb_hash_aref(options, rb_eval_string(":buffer_size"));
  VALUE r_replication = rb_hash_aref(options, rb_eval_string(":replication"));
  VALUE r_block_size = rb_hash_aref(options, rb_eval_string(":block_size"));
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFile file = hdfsOpenFile(data->fs, RSTRING_PTR(path), flags, 
    RTEST(r_buffer_size) ? NUM2INT(r_buffer_size) : 0, 
    RTEST(r_replication) ? NUM2INT(r_replication) : 0, 
    RTEST(r_block_size) ? NUM2INT(r_block_size) : HDFS_DEFAULT_BLOCK_SIZE);
  if (file == NULL) {
    rb_raise(e_could_not_open, "Could not open file %s", RSTRING_PTR(path));
    return Qnil;
  }
  FileData* file_data = ALLOC_N(FileData, 1);
  file_data->fs = data->fs;
  file_data->file = file;
  VALUE file_instance = Data_Wrap_Struct(c_file, NULL, free_file_data, file_data);
  return file_instance;
}

#rename(from_path, to_path) ⇒ Object

Renames the file at the supplied path to the file at the destination path. Returns True if successful, raises a DFSException if this fails.



257
258
259
260
261
262
263
264
265
266
# File 'ext/hdfs/hdfs.c', line 257

VALUE HDFS_File_System_rename(VALUE self, VALUE from_path, VALUE to_path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsRename(data->fs, RSTRING_PTR(from_path), RSTRING_PTR(to_path)) < 0) {
    rb_raise(e_dfs_exception, "Could not rename path: %s to path: %s",
        RSTRING_PTR(from_path), RSTRING_PTR(to_path));
    return Qnil;
  }
  return Qtrue;
}

#set_replication(path, replication = 3) ⇒ Object

Sets the replication of the following path to the supplied number of nodes it will be replicated against. Returns True if successful; raises a DFSException if this fails.



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'ext/hdfs/hdfs.c', line 358

VALUE HDFS_File_System_set_replication(int argc, VALUE* argv, VALUE self) {
  VALUE path, replication;
  rb_scan_args(argc, argv, "11", &path, &replication);
  int hdfs_replication;
  // If no replication value is supplied, uses default replication value.
  if (NIL_P(replication)) {
    hdfs_replication = HDFS_DEFAULT_REPLICATION;
  } else {
    hdfs_replication = NUM2INT(replication);
  }
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsSetReplication(data->fs, RSTRING_PTR(path), hdfs_replication) < 0) {
    rb_raise(e_dfs_exception, "Failed to set replication to: %d at path: %s",
        hdfs_replication, RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}

#stat(path) ⇒ Object

Stats the file or directory at the supplied path, returning a Hadoop::DFS:FileInfo object corresponding to it. If the file or directory does not exist, raises a DoesNotExistError.



337
338
339
340
341
342
343
344
345
346
347
348
# File 'ext/hdfs/hdfs.c', line 337

VALUE HDFS_File_System_stat(VALUE self, VALUE path) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  hdfsFileInfo* info = hdfsGetPathInfo(data->fs, RSTRING_PTR(path));
  if (info == NULL) {
    rb_raise(e_does_not_exist, "File does not exist: %s", RSTRING_PTR(path));
    return Qnil;
  }
  VALUE file_info = wrap_hdfsFileInfo(info);
  hdfsFreeFileInfo(info, 1);
  return file_info;
}

#usedObject

Returns the bytes currently in use by this filesystem, raising a DFSException if unsuccessful.



609
610
611
612
613
614
615
616
617
618
# File 'ext/hdfs/hdfs.c', line 609

VALUE HDFS_File_System_used(VALUE self) {
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  long used = hdfsGetUsed(data->fs);
  if (used < 0) {
    rb_raise(e_dfs_exception, "Error while retrieving used capacity");
    return Qnil;
  }
  return LONG2NUM(used);
}

#utime(path, modified_time = nil, access_time = nil) ⇒ Object

Changes the last modified and/or last access time in seconds since the Unix epoch for the supplied file. Returns true if successful; false if not.



627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
# File 'ext/hdfs/hdfs.c', line 627

VALUE HDFS_File_System_utime(int argc, VALUE* argv, VALUE self) {
  VALUE path, modified_time, access_time;
  tTime hdfs_modified_time, hdfs_access_time;
  rb_scan_args(argc, argv, "12", &path, &modified_time, &access_time);
  // Sets default values for last modified and/or last access time.
  if (NIL_P(modified_time)) {
    hdfs_modified_time = -1;
  } else {
    hdfs_modified_time = NUM2LONG(modified_time);
  }
  if (NIL_P(access_time)) {
    hdfs_access_time = -1;
  } else {
    hdfs_access_time = NUM2LONG(access_time);
  }
  FSData* data = NULL;
  Data_Get_Struct(self, FSData, data);
  if (hdfsUtime(data->fs, RSTRING_PTR(path), hdfs_modified_time,
      hdfs_access_time) < 0) {
    rb_raise(e_dfs_exception,
        "Error while setting modified time: %lu, access time: %lu at path: %s",
        (long) hdfs_modified_time, (long) hdfs_access_time, RSTRING_PTR(path));
    return Qnil;
  }
  return Qtrue;
}