Class: HBase

Inherits:
Object
  • Object
show all
Includes:
Admin
Defined in:
lib/hbase-jruby/hbase.rb,
lib/hbase-jruby/row.rb,
lib/hbase-jruby/cell.rb,
lib/hbase-jruby/util.rb,
lib/hbase-jruby/admin.rb,
lib/hbase-jruby/table.rb,
lib/hbase-jruby/schema.rb,
lib/hbase-jruby/scoped.rb,
lib/hbase-jruby/version.rb,
lib/hbase-jruby/byte_array.rb,
lib/hbase-jruby/dependency.rb,
lib/hbase-jruby/table/admin.rb,
lib/hbase-jruby/table/mutation.rb,
lib/hbase-jruby/batch_exception.rb,
lib/hbase-jruby/table/inspection.rb,
lib/hbase-jruby/scoped/aggregation.rb,
lib/hbase-jruby/table/batch_action.rb,
lib/hbase-jruby/table/checked_operation.rb

Overview

HBase connection

Defined Under Namespace

Modules: Admin, JRuby, Util Classes: BatchException, ByteArray, Cell, Row, Schema, Scoped, Table

Constant Summary collapse

Result =

For backward compatibility

HBase::Row
SUPPORTED_PROFILES =
{
  # Prefix => Latest known version
  'cdh4.3' => 'cdh4.3.0',
  'cdh4.2' => 'cdh4.2.1',
  'cdh4.1' => 'cdh4.1.4',
  'cdh3'   => 'cdh3u6',
  '0.95'   => '0.95.0',
  '0.94'   => '0.94.9',
  '0.92'   => '0.92.2',
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}) ⇒ HBase

Connects to HBase

Parameters:

  • config (Hash) (defaults to: {})

    A key-value pairs to build HBaseConfiguration from



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/hbase-jruby/hbase.rb', line 44

def initialize config = {}
  begin
    org.apache.hadoop.conf.Configuration
  rescue NameError
    raise NameError.new(
      "Required Java classes not loaded. Set up CLASSPATH or try `HBase.resolve_dependency!`")
  end

  HBase.import_java_classes!

  @config =
    case config
    when org.apache.hadoop.conf.Configuration
      config
    else
      HBaseConfiguration.create.tap do |hbcfg|
        config.each do |k, v|
          hbcfg.set k.to_s, v.to_s
        end
      end
    end
  @htable_pool = HTablePool.new @config, java.lang.Integer::MAX_VALUE
  @threads = Set.new
  @mutex   = Mutex.new
  @schema  = Schema.new
  @closed  = false
end

Instance Attribute Details

#configObject (readonly)



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/hbase-jruby/hbase.rb', line 8

class HBase
  attr_reader :config, :schema

  include Admin

  # @overload HBase.log4j=(filename)
  #   Configure Log4j logging with the given file
  #   @param [String] filename Path to log4j.properties or log4j.xml file
  #   @return [nil]
  # @overload HBase.log4j=(hash)
  #   Configure Log4j logging with the given Hash
  #   @param [Hash] hash Log4j properties in Ruby Hash
  #   @return [nil]
  # @overload HBase.log4j=(props)
  #   Configure Log4j logging with the given Properties
  #   @param [java.util.Properties] props Properties object
  #   @return [nil]
  def self.log4j= arg
    if arg.is_a?(Hash)
      props = java.util.Properties.new
      arg.each do |k, v|
        props.setProperty k.to_s, v.to_s
      end
      org.apache.log4j.PropertyConfigurator.configure props
    else
      case File.extname(arg).downcase
      when '.xml'
        org.apache.log4j.xml.DOMConfigurator.configure arg
      else
        org.apache.log4j.PropertyConfigurator.configure arg
      end
    end
  end

  # Connects to HBase
  # @param [Hash] config A key-value pairs to build HBaseConfiguration from
  def initialize config = {}
    begin
      org.apache.hadoop.conf.Configuration
    rescue NameError
      raise NameError.new(
        "Required Java classes not loaded. Set up CLASSPATH or try `HBase.resolve_dependency!`")
    end

    HBase.import_java_classes!

    @config =
      case config
      when org.apache.hadoop.conf.Configuration
        config
      else
        HBaseConfiguration.create.tap do |hbcfg|
          config.each do |k, v|
            hbcfg.set k.to_s, v.to_s
          end
        end
      end
    @htable_pool = HTablePool.new @config, java.lang.Integer::MAX_VALUE
    @threads = Set.new
    @mutex   = Mutex.new
    @schema  = Schema.new
    @closed  = false
  end

  # Returns an HBaseAdmin object for administration
  # @yield [admin] An HBaseAdmin object
  # @yieldparam [org.apache.hadoop.hbase.client.HBaseAdmin] admin
  # @return [org.apache.hadoop.hbase.client.HBaseAdmin]
  def admin
    if block_given?
      with_admin { |admin| yield admin }
    else
      check_closed
      HBaseAdmin.new @config
    end
  end

  # Closes HTablePool and connection
  # @return [nil]
  def close
    @mutex.synchronize do
      unless @closed
        @closed = true
        close_table_pool
        HConnectionManager.deleteConnection(@config, true)
      end
    end
  end

  # Returns whether if the connection is closed
  # @return [Boolean]
  def closed?
    @closed
  end

  # Returns the list of HBase::Table instances
  # @return [Array<HBase::Table>]
  def tables
    check_closed
    table_names.map { |tn| table(tn) }
  end

  # Returns the list of table names
  # @return [Array<String>]
  def table_names
    check_closed
    with_admin { |admin| admin.list_tables.map(&:name_as_string) }
  end
  alias list table_names

  # Creates an HBase::Table instance for the specified name
  # @param [#to_s] table_name The name of the table
  # @return [HBase::Table]
  def table table_name
    check_closed

    ht = HBase::Table.send :new, self, @config, table_name

    if block_given?
      yield ht
    else
      ht
    end
  end
  alias [] table

  # Returns an Array of snapshot information
  # @return [Array<Hash>]
  def snapshots
    with_admin { |admin| admin.listSnapshots }.map { |sd|
      props = sd.getAllFields.map { |k, v|
        [k.name.to_sym, v.respond_to?(:name) ? v.name : v]
      }
      Hash[props]
    }
  end

  # @param [Hash] hash
  # @return [HBase::Schema]
  def schema= hash
    unless hash.is_a?(Hash)
      raise ArgumentError, "invalid schema: Hash required"
    end

    schema = Schema.new
    hash.each do |table, definition|
      schema[table] = definition
    end
    @schema = schema
 end

  # Reset underlying HTablePool
  # @return [nil]
  def reset_table_pool
    @mutex.synchronize do
      close_table_pool
      @htable_pool = HTablePool.new @config, java.lang.Integer::MAX_VALUE
    end
    nil
  end

private
  def register_thread t
    @mutex.synchronize do
      check_closed
      @threads << t
    end
  end

  def close_table_pool
    # Close all the HTable instances in the pool
    @htable_pool.close

    # Cleanup thread-local references
    @threads.each do |thr|
      thr[:hbase_jruby].delete self
    end
  end

  def get_htable name
    @htable_pool.get_table name
  end

  def check_closed
    raise RuntimeError, "Connection already closed" if closed?
  end
end

#javaObject (readonly)



# File 'lib/hbase-jruby/cell.rb', line 3

#nameObject (readonly)



# File 'lib/hbase-jruby/table.rb', line 5

#schemaObject

Returns the value of attribute schema.



9
10
11
# File 'lib/hbase-jruby/hbase.rb', line 9

def schema
  @schema
end

#tableObject (readonly) Also known as: []



121
122
123
124
125
126
127
128
129
130
131
# File 'lib/hbase-jruby/hbase.rb', line 121

def table table_name
  check_closed

  ht = HBase::Table.send :new, self, @config, table_name

  if block_given?
    yield ht
  else
    ht
  end
end

Class Method Details

.ByteArray(*values) ⇒ HBase::ByteArray

Shortcut method to HBase::ByteArray.new

Parameters:

  • values (*Object)

Returns:



6
7
8
# File 'lib/hbase-jruby/byte_array.rb', line 6

def ByteArray *values
  ByteArray.new(*values)
end

.import_java_classes!Array<String>

Import Java classes (Prerequisite for classes in hbase-jruby)

Returns:

  • (Array<String>)

    List of Java classes NOT found



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/hbase-jruby/dependency.rb', line 120

def import_java_classes!
  imp = lambda { |hash|
    hash.map { |base, classes|
      base.class_eval do
        classes.map { |klass|
          begin
            java_import klass
            nil
          rescue NameError => e
            klass
          end
        }.compact
      end
    }.flatten
  }

  imp.call(
    HBase => %w[
      org.apache.hadoop.hbase.HBaseConfiguration
      org.apache.hadoop.hbase.client.HBaseAdmin
      org.apache.hadoop.hbase.client.HConnectionManager
      org.apache.hadoop.hbase.client.HTablePool
    ],
    HBase::Util => %w[
      java.nio.ByteBuffer
      org.apache.hadoop.hbase.KeyValue
      org.apache.hadoop.hbase.util.Bytes
    ],
    HBase::ByteArray => %w[
      java.util.Arrays
      org.apache.hadoop.hbase.util.Bytes
    ],
    HBase::Cell => %w[
      org.apache.hadoop.hbase.KeyValue
    ],
    HBase::Result => %w[
      org.apache.hadoop.hbase.util.Bytes
    ],
    HBase::Table => %w[
      org.apache.hadoop.hbase.HColumnDescriptor
      org.apache.hadoop.hbase.HTableDescriptor
      org.apache.hadoop.hbase.client.Append
      org.apache.hadoop.hbase.client.Delete
      org.apache.hadoop.hbase.client.Increment
      org.apache.hadoop.hbase.client.Put
      org.apache.hadoop.hbase.client.RowMutations
      org.apache.hadoop.hbase.io.hfile.Compression
      org.apache.hadoop.hbase.regionserver.StoreFile
    ],
    HBase::Scoped => %w[
      org.apache.hadoop.hbase.client.Get
      org.apache.hadoop.hbase.client.Scan
      org.apache.hadoop.hbase.filter.BinaryComparator
      org.apache.hadoop.hbase.filter.ColumnPaginationFilter
      org.apache.hadoop.hbase.filter.ColumnRangeFilter
      org.apache.hadoop.hbase.filter.CompareFilter
      org.apache.hadoop.hbase.filter.FilterBase
      org.apache.hadoop.hbase.filter.FilterList
      org.apache.hadoop.hbase.filter.KeyOnlyFilter
      org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter
      org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter
      org.apache.hadoop.hbase.filter.PrefixFilter
      org.apache.hadoop.hbase.filter.RegexStringComparator
      org.apache.hadoop.hbase.filter.RowFilter
      org.apache.hadoop.hbase.filter.SingleColumnValueFilter
      org.apache.hadoop.hbase.filter.WhileMatchFilter
      org.apache.hadoop.hbase.client.coprocessor.AggregationClient
      org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter
    ]).tap { |not_found|

    if not_found.empty?
      self.instance_eval do
        def import_java_classes!
          []
        end
      end
    end
  }
end

.HBase.log4j=(filename) ⇒ nil .HBase.log4j=(hash) ⇒ nil .HBase.log4j=(props) ⇒ nil

Overloads:

  • .HBase.log4j=(filename) ⇒ nil

    Configure Log4j logging with the given file

    Parameters:

    • filename (String)

      Path to log4j.properties or log4j.xml file

    Returns:

    • (nil)
  • .HBase.log4j=(hash) ⇒ nil

    Configure Log4j logging with the given Hash

    Parameters:

    • hash (Hash)

      Log4j properties in Ruby Hash

    Returns:

    • (nil)
  • .HBase.log4j=(props) ⇒ nil

    Configure Log4j logging with the given Properties

    Parameters:

    • props (java.util.Properties)

      Properties object

    Returns:

    • (nil)


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/hbase-jruby/hbase.rb', line 25

def self.log4j= arg
  if arg.is_a?(Hash)
    props = java.util.Properties.new
    arg.each do |k, v|
      props.setProperty k.to_s, v.to_s
    end
    org.apache.log4j.PropertyConfigurator.configure props
  else
    case File.extname(arg).downcase
    when '.xml'
      org.apache.log4j.xml.DOMConfigurator.configure arg
    else
      org.apache.log4j.PropertyConfigurator.configure arg
    end
  end
end

.resolve_dependency!(dist, options) ⇒ Array<String> .resolve_dependency!(pom_path, options) ⇒ Array<String>

Overloads:

  • .resolve_dependency!(dist, options) ⇒ Array<String>

    Resolve Hadoop and HBase dependency with a predefined Maven profile

    Parameters:

    • dist (String)

      HBase distribution: cdh4.2, cdh4.1, cdh3, 0.94, 0.92, local

    • options (Hash)

      Options

    Options Hash (options):

    • :verbose (Boolean)

      Enable verbose output

    Returns:

    • (Array<String>)

      Loaded JAR files

  • .resolve_dependency!(pom_path, options) ⇒ Array<String>

    Resolve Hadoop and HBase dependency with the given Maven POM file

    Parameters:

    • pom_path (String)

      Path to POM file

    • options (Hash)

      Options

    Options Hash (options):

    • :verbose (Boolean)

      Enable verbose output

    • :profile (String)

      Maven profile

    Returns:

    • (Array<String>)

      Loaded JAR files



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/hbase-jruby/dependency.rb', line 38

def resolve_dependency! dist, options = {}
  # Backward-compatibility
  options = { :verbose => options } if [true, false].include?(options)
  options = { :verbose => false }.merge(options)

  dist    = dist.to_s
  verbose = options[:verbose]

  silencer = verbose ? '' : '> /dev/null'
  tempfiles = []

  jars =
    if %w[hbase local].include?(dist)
      # Check for hbase executable
      hbase = `which hbase`
      raise RuntimeError, "Cannot find `hbase` executable" if hbase.empty?
      `hbase classpath`.strip.split(':').map { |e| Dir[e] }.flatten
    else
      # Check for Maven executable
      mvn = `which mvn`
      raise RuntimeError, "Cannot find `mvn` executable" if mvn.empty?

      # POM file path given (with optional profile)
      if File.exists?(dist)
        path = dist
        profile = options[:profile] && "-P #{options[:profile]}"
      # Predefined dependencies
      else
        matched_profiles = SUPPORTED_PROFILES.keys.select { |pf| dist.start_with? pf }
        if matched_profiles.length != 1
          raise ArgumentError, "Invalid profile: #{dist}"
        end
        matched_profile = matched_profiles.first
        profiles = SUPPORTED_PROFILES.dup
        profiles[matched_profile] = dist if dist != matched_profile
        tempfiles << tf = Tempfile.new('hbase-jruby-pom')
        erb = ERB.new(File.read File.expand_path("../pom/pom.xml.erb", __FILE__))
        tf << erb.result(binding)
        tf.close(false)
        path = tf.path
        profile = "-P #{matched_profile}"
      end

      # Download dependent JAR files and build classpath string
      tempfiles << tf = Tempfile.new('hbase-jruby-classpath')
      tf.close(false)
      system "mvn org.apache.maven.plugins:maven-dependency-plugin:2.5.1:resolve org.apache.maven.plugins:maven-dependency-plugin:2.5.1:build-classpath -Dsilent=true -Dmdep.outputFile=#{tf.path} #{profile} -f #{path} #{silencer}"

      raise RuntimeError.new("Error occurred. Set verbose option to see the log.") unless $?.exitstatus == 0

      if File.read(tf.path).empty?
        desc =
          if options[:profile]
            "#{dist} (#{options[:profile]})"
          else
            dist
          end
        raise ArgumentError.new("Invalid profile: #{desc}")
      end
      File.read(tf.path).split(':')
    end

  # Load jars
  jars_loaded = jars.select { |jar|
    File.file?(jar) &&
    File.extname(jar).downcase == '.jar' &&
    require(jar)
  }

  # Try importing Java classes again
  not_found = HBase.import_java_classes!
  if verbose && !not_found.empty?
    warn "Java classes not found: #{not_found.join(', ')}"
  end

  return jars_loaded
ensure
  tempfiles.each { |tempfile| tempfile.unlink rescue nil }
end

Instance Method Details

#admin {|admin| ... } ⇒ org.apache.hadoop.hbase.client.HBaseAdmin

Returns an HBaseAdmin object for administration

Yields:

  • (admin)

    An HBaseAdmin object

Yield Parameters:

  • admin (org.apache.hadoop.hbase.client.HBaseAdmin)

Returns:

  • (org.apache.hadoop.hbase.client.HBaseAdmin)


76
77
78
79
80
81
82
83
# File 'lib/hbase-jruby/hbase.rb', line 76

def admin
  if block_given?
    with_admin { |admin| yield admin }
  else
    check_closed
    HBaseAdmin.new @config
  end
end

#closenil

Closes HTablePool and connection

Returns:

  • (nil)


87
88
89
90
91
92
93
94
95
# File 'lib/hbase-jruby/hbase.rb', line 87

def close
  @mutex.synchronize do
    unless @closed
      @closed = true
      close_table_pool
      HConnectionManager.deleteConnection(@config, true)
    end
  end
end

#closed?Boolean

Returns whether if the connection is closed

Returns:

  • (Boolean)


99
100
101
# File 'lib/hbase-jruby/hbase.rb', line 99

def closed?
  @closed
end

#reset_table_poolnil

Reset underlying HTablePool

Returns:

  • (nil)


161
162
163
164
165
166
167
# File 'lib/hbase-jruby/hbase.rb', line 161

def reset_table_pool
  @mutex.synchronize do
    close_table_pool
    @htable_pool = HTablePool.new @config, java.lang.Integer::MAX_VALUE
  end
  nil
end

#snapshotsArray<Hash>

Returns an Array of snapshot information

Returns:

  • (Array<Hash>)


136
137
138
139
140
141
142
143
# File 'lib/hbase-jruby/hbase.rb', line 136

def snapshots
  with_admin { |admin| admin.listSnapshots }.map { |sd|
    props = sd.getAllFields.map { |k, v|
      [k.name.to_sym, v.respond_to?(:name) ? v.name : v]
    }
    Hash[props]
  }
end

#table_namesArray<String> Also known as: list

Returns the list of table names

Returns:

  • (Array<String>)


112
113
114
115
# File 'lib/hbase-jruby/hbase.rb', line 112

def table_names
  check_closed
  with_admin { |admin| admin.list_tables.map(&:name_as_string) }
end

#tablesArray<HBase::Table>

Returns the list of HBase::Table instances

Returns:



105
106
107
108
# File 'lib/hbase-jruby/hbase.rb', line 105

def tables
  check_closed
  table_names.map { |tn| table(tn) }
end