Class: Ganapati::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ganapati/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, port, timeout = 60) ⇒ Client

Returns a new instance of Client.



4
5
6
7
8
9
10
11
# File 'lib/ganapati/client.rb', line 4

def initialize(server, port, timeout=60)
  socket = Thrift::Socket.new(server, port)
  @transport = Thrift::BufferedTransport.new(socket)
  @transport.open
  protocol = Thrift::BinaryProtocol.new(@transport)
  @client = ThriftHadoopFileSystem::Client.new(protocol)
  @client.setInactivityTimeoutPeriod(timeout)
end

Class Method Details

.run(server, port) ⇒ Object



125
126
127
128
129
130
# File 'lib/ganapati/client.rb', line 125

def self.run(server, port)
  c = Client.new(server, port)
  result = yield c
  c.close
  result
end

Instance Method Details

#append(path, &block) ⇒ Object

for appending



82
83
84
# File 'lib/ganapati/client.rb', line 82

def append(path, &block)
  file_handle :append, path, &block
end

#chmod(path, mode) ⇒ Object



113
114
115
# File 'lib/ganapati/client.rb', line 113

def chmod(path, mode)
  @client.chmod pname(path), mode
end

#chown(path, owner, group) ⇒ Object



117
118
119
# File 'lib/ganapati/client.rb', line 117

def chown(path, owner, group)
  @client.chown pname(path), owner, group
end

#closeObject



13
14
15
# File 'lib/ganapati/client.rb', line 13

def close
  @transport.close
end

#create(path, &block) ⇒ Object

for writing to a new file



72
73
74
# File 'lib/ganapati/client.rb', line 72

def create(path, &block)
  file_handle :create, path, &block
end

#exists?(path) ⇒ Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/ganapati/client.rb', line 98

def exists?(path)
  @client.exists pname(path)
end

#get(remotepath, destpath) ⇒ Object

copy remote file to local



35
36
37
38
39
40
41
# File 'lib/ganapati/client.rb', line 35

def get(remotepath, destpath)
  Kernel.open(destpath, 'w') { |dest|
    readchunks(remotepath) { |chunk|
      dest.write chunk
    }
  }
end

#ls(path, details = false, recursive = false) ⇒ Object



106
107
108
109
110
111
# File 'lib/ganapati/client.rb', line 106

def ls(path, details=false, recursive=false)
  statuses = @client.listStatus pname(path)
  paths = (details) ? statuses : statuses.map { |s| s.path }
  return paths if not recursive
  paths + statuses.select { |s| s.isdir }.map { |s| ls(s.path, details, recursive) }.flatten
end

#mkdir(path) ⇒ Object



94
95
96
# File 'lib/ganapati/client.rb', line 94

def mkdir(path)
  @client.mkdirs pname(path)
end

#mv(source, dest) ⇒ Object



90
91
92
# File 'lib/ganapati/client.rb', line 90

def mv(source, dest)
  @client.rename pname(source), pname(dest)
end

#open(path, &block) ⇒ Object

for reading



77
78
79
# File 'lib/ganapati/client.rb', line 77

def open(path, &block)
  file_handle :open, path, &block
end

#put(localpath, destpath) ⇒ Object

copy local file to remote



23
24
25
26
27
28
29
30
31
32
# File 'lib/ganapati/client.rb', line 23

def put(localpath, destpath)
  create(destpath) { |dest|
    Kernel.open(localpath) { |source|
      # read 1 MB at a time
      while record = source.read(1048576)
        dest.write(record)
      end
    }
  }
end

#readchunks(path, chunksize = 1048576) ⇒ Object

yeild chunksize of path one chunk at a time



44
45
46
47
48
49
50
51
52
53
# File 'lib/ganapati/client.rb', line 44

def readchunks(path, chunksize=1048576)
  open(path) { |source|
    size = source.length
    index = 0
    while index < size
      yield source.read(index, chunksize)
      index += chunksize
    end
  }
end

#readlines(path, sep = "\n") {|buffer| ... } ⇒ Object

Yields:

  • (buffer)


55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/ganapati/client.rb', line 55

def readlines(path, sep="\n")
  buffer = ""

  readchunks(path) { |chunk|
    buffer << chunk

    # partitions[1] will be empty if sep does not exist in the string
    while partitions = buffer.partition(sep) and !partitions[1].empty?
      yield partitions.first
      buffer = partitions.last
    end
  }

  yield buffer if buffer.size > 0
end

#rm(path, recursive = false) ⇒ Object



86
87
88
# File 'lib/ganapati/client.rb', line 86

def rm(path, recursive=false)
  @client.rm pname(path), recursive
end

#set_replication(path, level) ⇒ Object



121
122
123
# File 'lib/ganapati/client.rb', line 121

def set_replication(path, level)
  @client.setReplication pname(path), level
end

#shutdown(status = 0) ⇒ Object

shutdown the thrift server



18
19
20
# File 'lib/ganapati/client.rb', line 18

def shutdown(status=0)
  @client.shutdown status
end

#stat(path) ⇒ Object



102
103
104
# File 'lib/ganapati/client.rb', line 102

def stat(path)
  @client.stat pname(path)
end