Class: HBase

Inherits:
Object
  • Object
show all
Includes:
Admin, Util
Defined in:
lib/hbase-jruby/hbase.rb,
lib/hbase-jruby.rb,
lib/hbase-jruby/row.rb,
lib/hbase-jruby/cell.rb,
lib/hbase-jruby/util.rb,
lib/hbase-jruby/admin.rb,
lib/hbase-jruby/table.rb,
lib/hbase-jruby/schema.rb,
lib/hbase-jruby/scoped.rb,
lib/hbase-jruby/version.rb,
lib/hbase-jruby/byte_array.rb,
lib/hbase-jruby/dependency.rb,
lib/hbase-jruby/table/admin.rb,
lib/hbase-jruby/table/mutation.rb,
lib/hbase-jruby/batch_exception.rb,
lib/hbase-jruby/table/inspection.rb,
lib/hbase-jruby/scoped/aggregation.rb,
lib/hbase-jruby/table/batch_action.rb,
lib/hbase-jruby/table/checked_operation.rb

Overview

HBase connection

Defined Under Namespace

Modules: Admin, JRuby, Util Classes: BatchException, ByteArray, Cell, Row, Schema, Scoped, Table

Constant Summary collapse

Result =

For backward compatibility

HBase::Row
DEFAULT_COLUMN_CACHE_SIZE =
200
@@log4j =
nil

Constants included from Util

Util::JAVA_BYTE_ARRAY_CLASS, Util::JAVA_BYTE_ARRAY_EMPTY

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

append_0, from_bytes, java_bytes?, parse_column_name, to_bytes, to_typed_bytes

Constructor Details

#initialize(zookeeper_quorum) ⇒ HBase #initialize(config) ⇒ HBase

Connects to HBase

Overloads:

  • #initialize(zookeeper_quorum) ⇒ HBase

    Parameters:

    • zookeeper_quorum (String)

      hbase.zookeeper.quorum

  • #initialize(config) ⇒ HBase

    Parameters:

    • config (Hash)

      A key-value pairs to build HBaseConfiguration from



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/hbase-jruby/hbase.rb', line 52

def initialize config = {}
  begin
    org.apache.hadoop.conf.Configuration
  rescue NameError
    raise NameError.new(
      "Required Java classes not loaded. Set up CLASSPATH.`")
  end

  HBase.import_java_classes!

  @config =
    case config
    when String
      HBaseConfiguration.create.tap do |hbcfg|
        hbcfg.set 'hbase.zookeeper.quorum', config
      end
    when org.apache.hadoop.conf.Configuration
      config
    else
      HBaseConfiguration.create.tap do |hbcfg|
        config.each do |k, v|
          hbcfg.set k.to_s, v.to_s
        end
      end
    end
  @connection = HConnectionManager.createConnection @config
  @htable_pool =
    if @connection.respond_to?(:getTable)
      nil
    else
      HTablePool.new @config, java.lang.Integer::MAX_VALUE
    end
  @mutex   = Mutex.new
  @schema  = Schema.new
  @closed  = false
end

Instance Attribute Details

#configObject (readonly)



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/hbase-jruby/hbase.rb', line 10

class HBase
  attr_reader :config, :schema

  include Admin
  include HBase::Util

  DEFAULT_COLUMN_CACHE_SIZE = 200

  # @overload HBase.log4j=(filename)
  #   Configure Log4j logging with the given file
  #   @param [String] filename Path to log4j.properties or log4j.xml file
  #   @return [String]
  # @overload HBase.log4j=(hash)
  #   Configure Log4j logging with the given Hash
  #   @param [Hash] hash Log4j properties in Ruby Hash
  #   @return [Hash]
  # @overload HBase.log4j=(props)
  #   Configure Log4j logging with the given Properties
  #   @param [java.util.Properties] props Properties object
  #   @return [java.util.Properties]
  def self.log4j= arg
    if arg.is_a?(Hash)
      props = java.util.Properties.new
      arg.each do |k, v|
        props.setProperty k.to_s, v.to_s
      end
      org.apache.log4j.PropertyConfigurator.configure props
    else
      case File.extname(arg).downcase
      when '.xml'
        org.apache.log4j.xml.DOMConfigurator.configure arg
      else
        org.apache.log4j.PropertyConfigurator.configure arg
      end
    end
  end

  # Connects to HBase
  # @overload initialize(zookeeper_quorum)
  #   @param [String] zookeeper_quorum hbase.zookeeper.quorum
  # @overload initialize(config)
  #   @param [Hash] config A key-value pairs to build HBaseConfiguration from
  def initialize config = {}
    begin
      org.apache.hadoop.conf.Configuration
    rescue NameError
      raise NameError.new(
        "Required Java classes not loaded. Set up CLASSPATH.`")
    end

    HBase.import_java_classes!

    @config =
      case config
      when String
        HBaseConfiguration.create.tap do |hbcfg|
          hbcfg.set 'hbase.zookeeper.quorum', config
        end
      when org.apache.hadoop.conf.Configuration
        config
      else
        HBaseConfiguration.create.tap do |hbcfg|
          config.each do |k, v|
            hbcfg.set k.to_s, v.to_s
          end
        end
      end
    @connection = HConnectionManager.createConnection @config
    @htable_pool =
      if @connection.respond_to?(:getTable)
        nil
      else
        HTablePool.new @config, java.lang.Integer::MAX_VALUE
      end
    @mutex   = Mutex.new
    @schema  = Schema.new
    @closed  = false
  end

  # Returns if this instance is backed by an HTablePool which is deprecated
  # in the recent versions of HBase
  # @return [Boolean]
  def use_table_pool?
    !@htable_pool.nil?
  end

  # Returns an HBaseAdmin object for administration
  # @yield [admin] An HBaseAdmin object
  # @yieldparam [org.apache.hadoop.hbase.client.HBaseAdmin] admin
  # @return [org.apache.hadoop.hbase.client.HBaseAdmin]
  def admin
    if block_given?
      with_admin { |admin| yield admin }
    else
      check_closed
      HBaseAdmin.new @config
    end
  end

  # Closes the connection, and clean up thread-local cache
  # @return [nil]
  def close
    @mutex.synchronize do
      unless @closed
        @closed = true
        @htable_pool.close if use_table_pool?
        @connection.close

        # To be deprecated
        begin
          HConnectionManager.deleteConnection(@config)
        rescue ArgumentError
          # HBase 0.92 or below
          HConnectionManager.deleteConnection(@config, true)
        end if use_table_pool?
      end
    end

    thread_local.delete self

    nil
  end

  # Returns whether if the connection is closed
  # @return [Boolean]
  def closed?
    @closed
  end

  # Returns the list of HBase::Table instances
  # @return [Array<HBase::Table>]
  def tables
    table_names.map { |tn| table(tn) }
  end

  # Returns the list of table names
  # @return [Array<String>]
  def table_names
    with_admin { |admin| admin.list_tables.map(&:name_as_string) }
  end
  alias list table_names

  # Creates an HBase::Table instance for the specified name
  # @param [#to_s] table_name The name of the table
  # @param [Hash] opts Options
  #   @option opts [Fixnum] :column_cache The size of thread-local column-key
  #   interpretation cache (default: 200)
  # @return [HBase::Table]
  def table table_name, opts = {}
    check_closed

    # Backward-compatibility (to be removed)
    if opts.has_key?(:cache)
      opts = { :column_cache => opts[:cache] ? DEFAULT_COLUMN_CACHE_SIZE : 0 }
    end

    ht = HBase::Table.send :new, self, @config,
        table_name, opts.fetch(:column_cache, DEFAULT_COLUMN_CACHE_SIZE)

    if block_given?
      yield ht
    else
      ht
    end
  end
  alias [] table

  # Returns an Array of snapshot information
  # @return [Array<Hash>]
  def snapshots
    with_admin { |admin| admin.listSnapshots }.map { |sd|
      props = sd.getAllFields.map { |k, v|
        [k.name.to_sym, v.respond_to?(:name) ? v.name : v]
      }
      Hash[props]
    }
  end

  # @param [Hash] hash
  # @return [HBase::Schema]
  def schema= hash
    unless hash.is_a?(Hash)
      raise ArgumentError, "invalid schema: Hash required"
    end

    schema = Schema.new
    hash.each do |table, definition|
      schema[table] = definition
    end
    @schema = schema
 end

  # Reset underlying HTablePool
  # @deprecated
  # @return [nil]
  def reset_table_pool
    raise RuntimeError, 'Not using table pool' unless use_table_pool?

    @mutex.synchronize do
      @htable_pool.close
      @htable_pool = HTablePool.new @config, java.lang.Integer::MAX_VALUE
    end
    nil
  end

private
  def get_htable name
    (@htable_pool || @connection).get_table name
  end

  def check_closed
    raise RuntimeError, "Connection already closed" if closed?
  end
end

#javaObject (readonly)



# File 'lib/hbase-jruby/cell.rb', line 3

#nameObject (readonly)



# File 'lib/hbase-jruby/table.rb', line 5

#schemaHBase::Schema

Returns:



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/hbase-jruby/hbase.rb', line 10

class HBase
  attr_reader :config, :schema

  include Admin
  include HBase::Util

  DEFAULT_COLUMN_CACHE_SIZE = 200

  # @overload HBase.log4j=(filename)
  #   Configure Log4j logging with the given file
  #   @param [String] filename Path to log4j.properties or log4j.xml file
  #   @return [String]
  # @overload HBase.log4j=(hash)
  #   Configure Log4j logging with the given Hash
  #   @param [Hash] hash Log4j properties in Ruby Hash
  #   @return [Hash]
  # @overload HBase.log4j=(props)
  #   Configure Log4j logging with the given Properties
  #   @param [java.util.Properties] props Properties object
  #   @return [java.util.Properties]
  def self.log4j= arg
    if arg.is_a?(Hash)
      props = java.util.Properties.new
      arg.each do |k, v|
        props.setProperty k.to_s, v.to_s
      end
      org.apache.log4j.PropertyConfigurator.configure props
    else
      case File.extname(arg).downcase
      when '.xml'
        org.apache.log4j.xml.DOMConfigurator.configure arg
      else
        org.apache.log4j.PropertyConfigurator.configure arg
      end
    end
  end

  # Connects to HBase
  # @overload initialize(zookeeper_quorum)
  #   @param [String] zookeeper_quorum hbase.zookeeper.quorum
  # @overload initialize(config)
  #   @param [Hash] config A key-value pairs to build HBaseConfiguration from
  def initialize config = {}
    begin
      org.apache.hadoop.conf.Configuration
    rescue NameError
      raise NameError.new(
        "Required Java classes not loaded. Set up CLASSPATH.`")
    end

    HBase.import_java_classes!

    @config =
      case config
      when String
        HBaseConfiguration.create.tap do |hbcfg|
          hbcfg.set 'hbase.zookeeper.quorum', config
        end
      when org.apache.hadoop.conf.Configuration
        config
      else
        HBaseConfiguration.create.tap do |hbcfg|
          config.each do |k, v|
            hbcfg.set k.to_s, v.to_s
          end
        end
      end
    @connection = HConnectionManager.createConnection @config
    @htable_pool =
      if @connection.respond_to?(:getTable)
        nil
      else
        HTablePool.new @config, java.lang.Integer::MAX_VALUE
      end
    @mutex   = Mutex.new
    @schema  = Schema.new
    @closed  = false
  end

  # Returns if this instance is backed by an HTablePool which is deprecated
  # in the recent versions of HBase
  # @return [Boolean]
  def use_table_pool?
    !@htable_pool.nil?
  end

  # Returns an HBaseAdmin object for administration
  # @yield [admin] An HBaseAdmin object
  # @yieldparam [org.apache.hadoop.hbase.client.HBaseAdmin] admin
  # @return [org.apache.hadoop.hbase.client.HBaseAdmin]
  def admin
    if block_given?
      with_admin { |admin| yield admin }
    else
      check_closed
      HBaseAdmin.new @config
    end
  end

  # Closes the connection, and clean up thread-local cache
  # @return [nil]
  def close
    @mutex.synchronize do
      unless @closed
        @closed = true
        @htable_pool.close if use_table_pool?
        @connection.close

        # To be deprecated
        begin
          HConnectionManager.deleteConnection(@config)
        rescue ArgumentError
          # HBase 0.92 or below
          HConnectionManager.deleteConnection(@config, true)
        end if use_table_pool?
      end
    end

    thread_local.delete self

    nil
  end

  # Returns whether if the connection is closed
  # @return [Boolean]
  def closed?
    @closed
  end

  # Returns the list of HBase::Table instances
  # @return [Array<HBase::Table>]
  def tables
    table_names.map { |tn| table(tn) }
  end

  # Returns the list of table names
  # @return [Array<String>]
  def table_names
    with_admin { |admin| admin.list_tables.map(&:name_as_string) }
  end
  alias list table_names

  # Creates an HBase::Table instance for the specified name
  # @param [#to_s] table_name The name of the table
  # @param [Hash] opts Options
  #   @option opts [Fixnum] :column_cache The size of thread-local column-key
  #   interpretation cache (default: 200)
  # @return [HBase::Table]
  def table table_name, opts = {}
    check_closed

    # Backward-compatibility (to be removed)
    if opts.has_key?(:cache)
      opts = { :column_cache => opts[:cache] ? DEFAULT_COLUMN_CACHE_SIZE : 0 }
    end

    ht = HBase::Table.send :new, self, @config,
        table_name, opts.fetch(:column_cache, DEFAULT_COLUMN_CACHE_SIZE)

    if block_given?
      yield ht
    else
      ht
    end
  end
  alias [] table

  # Returns an Array of snapshot information
  # @return [Array<Hash>]
  def snapshots
    with_admin { |admin| admin.listSnapshots }.map { |sd|
      props = sd.getAllFields.map { |k, v|
        [k.name.to_sym, v.respond_to?(:name) ? v.name : v]
      }
      Hash[props]
    }
  end

  # @param [Hash] hash
  # @return [HBase::Schema]
  def schema= hash
    unless hash.is_a?(Hash)
      raise ArgumentError, "invalid schema: Hash required"
    end

    schema = Schema.new
    hash.each do |table, definition|
      schema[table] = definition
    end
    @schema = schema
 end

  # Reset underlying HTablePool
  # @deprecated
  # @return [nil]
  def reset_table_pool
    raise RuntimeError, 'Not using table pool' unless use_table_pool?

    @mutex.synchronize do
      @htable_pool.close
      @htable_pool = HTablePool.new @config, java.lang.Integer::MAX_VALUE
    end
    nil
  end

private
  def get_htable name
    (@htable_pool || @connection).get_table name
  end

  def check_closed
    raise RuntimeError, "Connection already closed" if closed?
  end
end

#tableObject (readonly) Also known as: []



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/hbase-jruby/hbase.rb', line 158

def table table_name, opts = {}
  check_closed

  # Backward-compatibility (to be removed)
  if opts.has_key?(:cache)
    opts = { :column_cache => opts[:cache] ? DEFAULT_COLUMN_CACHE_SIZE : 0 }
  end

  ht = HBase::Table.send :new, self, @config,
      table_name, opts.fetch(:column_cache, DEFAULT_COLUMN_CACHE_SIZE)

  if block_given?
    yield ht
  else
    ht
  end
end

Class Method Details

.ByteArray(*values) ⇒ HBase::ByteArray

Shortcut method to HBase::ByteArray.new

Parameters:

  • values (*Object)

Returns:



6
7
8
# File 'lib/hbase-jruby/byte_array.rb', line 6

def ByteArray *values
  ByteArray.new(*values)
end

.import_java_classes!Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/hbase-jruby/dependency.rb', line 74

def import_java_classes!
  @mutex.synchronize do
    @deps.each do |base, list|
      base.class_eval do
        list.reject! do |classes|
          [*classes].find do |klass|
            begin
              java_import klass
              true
            rescue NameError
              false
            end
          end
        end
      end
    end
    @deps.reject! { |k, v| v.empty? }

    self.instance_eval do
      def import_java_classes!
      end
    end if @deps.empty?
  end
  nil
end

.HBase.log4j=(filename) ⇒ String .HBase.log4j=(hash) ⇒ Hash .HBase.log4j=(props) ⇒ java.util.Properties

Overloads:

  • .HBase.log4j=(filename) ⇒ String

    Configure Log4j logging with the given file

    Parameters:

    • filename (String)

      Path to log4j.properties or log4j.xml file

    Returns:

    • (String)
  • .HBase.log4j=(hash) ⇒ Hash

    Configure Log4j logging with the given Hash

    Parameters:

    • hash (Hash)

      Log4j properties in Ruby Hash

    Returns:

    • (Hash)
  • .HBase.log4j=(props) ⇒ java.util.Properties

    Configure Log4j logging with the given Properties

    Parameters:

    • props (java.util.Properties)

      Properties object

    Returns:

    • (java.util.Properties)


30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/hbase-jruby/hbase.rb', line 30

def self.log4j= arg
  if arg.is_a?(Hash)
    props = java.util.Properties.new
    arg.each do |k, v|
      props.setProperty k.to_s, v.to_s
    end
    org.apache.log4j.PropertyConfigurator.configure props
  else
    case File.extname(arg).downcase
    when '.xml'
      org.apache.log4j.xml.DOMConfigurator.configure arg
    else
      org.apache.log4j.PropertyConfigurator.configure arg
    end
  end
end

.versionString

Returns the version of the loaded client library

Returns:

  • (String)


69
70
71
# File 'lib/hbase-jruby/dependency.rb', line 69

def version
  org.apache.hadoop.hbase.util.VersionInfo.getVersion
end

Instance Method Details

#admin {|admin| ... } ⇒ org.apache.hadoop.hbase.client.HBaseAdmin

Returns an HBaseAdmin object for administration

Yields:

  • (admin)

    An HBaseAdmin object

Yield Parameters:

  • admin (org.apache.hadoop.hbase.client.HBaseAdmin)

Returns:

  • (org.apache.hadoop.hbase.client.HBaseAdmin)


100
101
102
103
104
105
106
107
# File 'lib/hbase-jruby/hbase.rb', line 100

def admin
  if block_given?
    with_admin { |admin| yield admin }
  else
    check_closed
    HBaseAdmin.new @config
  end
end

#closenil

Closes the connection, and clean up thread-local cache

Returns:

  • (nil)


111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/hbase-jruby/hbase.rb', line 111

def close
  @mutex.synchronize do
    unless @closed
      @closed = true
      @htable_pool.close if use_table_pool?
      @connection.close

      # To be deprecated
      begin
        HConnectionManager.deleteConnection(@config)
      rescue ArgumentError
        # HBase 0.92 or below
        HConnectionManager.deleteConnection(@config, true)
      end if use_table_pool?
    end
  end

  thread_local.delete self

  nil
end

#closed?Boolean

Returns whether if the connection is closed

Returns:

  • (Boolean)


135
136
137
# File 'lib/hbase-jruby/hbase.rb', line 135

def closed?
  @closed
end

#reset_table_poolnil

Deprecated.

Reset underlying HTablePool

Returns:

  • (nil)

Raises:

  • (RuntimeError)


205
206
207
208
209
210
211
212
213
# File 'lib/hbase-jruby/hbase.rb', line 205

def reset_table_pool
  raise RuntimeError, 'Not using table pool' unless use_table_pool?

  @mutex.synchronize do
    @htable_pool.close
    @htable_pool = HTablePool.new @config, java.lang.Integer::MAX_VALUE
  end
  nil
end

#snapshotsArray<Hash>

Returns an Array of snapshot information

Returns:

  • (Array<Hash>)


179
180
181
182
183
184
185
186
# File 'lib/hbase-jruby/hbase.rb', line 179

def snapshots
  with_admin { |admin| admin.listSnapshots }.map { |sd|
    props = sd.getAllFields.map { |k, v|
      [k.name.to_sym, v.respond_to?(:name) ? v.name : v]
    }
    Hash[props]
  }
end

#table_namesArray<String> Also known as: list

Returns the list of table names

Returns:

  • (Array<String>)


147
148
149
# File 'lib/hbase-jruby/hbase.rb', line 147

def table_names
  with_admin { |admin| admin.list_tables.map(&:name_as_string) }
end

#tablesArray<HBase::Table>

Returns the list of HBase::Table instances

Returns:



141
142
143
# File 'lib/hbase-jruby/hbase.rb', line 141

def tables
  table_names.map { |tn| table(tn) }
end

#use_table_pool?Boolean

Returns if this instance is backed by an HTablePool which is deprecated in the recent versions of HBase

Returns:

  • (Boolean)


92
93
94
# File 'lib/hbase-jruby/hbase.rb', line 92

def use_table_pool?
  !@htable_pool.nil?
end