Class: ActiveRecord::ConnectionAdapters::HypertableAdapter

Inherits:
AbstractAdapter
  • Object
show all
Defined in:
lib/active_record/connection_adapters/hypertable_adapter.rb

Constant Summary collapse

@@read_latency =
0.0
@@write_latency =
0.0
@@cells_read =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, logger, config) ⇒ HypertableAdapter

Returns a new instance of HypertableAdapter.



48
49
50
51
52
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 48

def initialize(connection, logger, config)
  super(connection, logger)
  @config = config
  @hypertable_column_names = {}
end

Instance Attribute Details

#retry_on_failureObject

Returns the value of attribute retry_on_failure.



46
47
48
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 46

def retry_on_failure
  @retry_on_failure
end

Class Method Details

.get_timingObject



65
66
67
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 65

def self.get_timing
  [@@read_latency, @@write_latency, @@cells_read]
end

.reset_timingObject



59
60
61
62
63
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 59

def self.reset_timing
  @@read_latency = 0.0
  @@write_latency = 0.0
  @@cells_read = 0
end

Instance Method Details

#adapter_nameObject



69
70
71
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 69

def adapter_name
  'Hypertable'
end

#add_column(table_name, column_name, type = :string, options = {}) ⇒ Object



354
355
356
357
358
359
360
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 354

def add_column(table_name, column_name, type=:string, options = {})
  hql = [ "ALTER TABLE #{quote_table_name(table_name)} ADD (" ]
  hql << quote_column_name(column_name)
  hql << "MAX_VERSIONS=#{options[:max_versions]}" if !options[:max_versions].blank?
  hql << ")"
  execute(hql.join(' '))
end

#add_column_options!(hql, options) ⇒ Object



362
363
364
365
366
367
368
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 362

def add_column_options!(hql, options)
  hql << " MAX_VERSIONS =1 #{quote(options[:default], options[:column])}" if options_include_default?(options)
  # must explicitly check for :null to allow change_column to work on migrations
  if options[:null] == false
    hql << " NOT NULL"
  end
end

#add_column_to_name_map(table_name, name) ⇒ Object



267
268
269
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 267

def add_column_to_name_map(table_name, name)
  @hypertable_column_names[table_name][rubify_column_name(name)] = name
end

#add_qualified_column(table_name, column_family, qualifiers = [], default = '', sql_type = nil, null = true) ⇒ Object



271
272
273
274
275
276
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 271

def add_qualified_column(table_name, column_family, qualifiers=[], default='', sql_type=nil, null=true)
  qc = QualifiedColumn.new(column_family, default, sql_type, null)
  qc.qualifiers = qualifiers
  qualifiers.each{|q| add_column_to_name_map(table_name, qualified_column_name(column_family, q))}
  qc
end

#cell_native_array(row_key, column_family, column_qualifier, value = nil, timestamp = nil) ⇒ Object

Create native array format for cell. [“row_key”, “column_family”, “column_qualifier”, “value”],



443
444
445
446
447
448
449
450
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 443

def cell_native_array(row_key, column_family, column_qualifier, value=nil, timestamp=nil)
  [
    row_key.to_s,
    column_family.to_s,
    column_qualifier.to_s,
    value.to_s
  ]
end

#change_column(table_name, column_name, new_column_name) ⇒ Object



305
306
307
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 305

def change_column(table_name, column_name, new_column_name)
  raise "change_column operation not supported by Hypertable."
end

#change_column_default(table_name, column_name, default) ⇒ Object



346
347
348
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 346

def change_column_default(table_name, column_name, default)
  raise "change_column_default operation not supported by Hypertable."
end

#change_column_null(table_name, column_name, null, default = nil) ⇒ Object



350
351
352
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 350

def change_column_null(table_name, column_name, null, default = nil)
  raise "change_column_null operation not supported by Hypertable."
end

#close_mutator(mutator, flush = true) ⇒ Object



505
506
507
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 505

def close_mutator(mutator, flush=true)
  @connection.close_mutator(mutator, flush)
end

#close_scanner(scanner) ⇒ Object



519
520
521
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 519

def close_scanner(scanner)
  @connection.close_scanner(scanner)
end

#columns(table_name, name = nil) ⇒ Object

Returns array of column objects for table associated with this class. Hypertable allows columns to include dashes in the name. This doesn’t play well with Ruby (can’t have dashes in method names), so we must maintain a mapping of original column names to Ruby-safe names.



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 243

def columns(table_name, name = nil)#:nodoc:
  # Each table always has a row key called 'ROW'
  columns = [
    Column.new('ROW', '')
  ]
  schema = describe_table(table_name)
  doc = REXML::Document.new(schema)
  column_families = doc.elements['Schema/AccessGroup[@name="default"]'].elements.to_a

  @hypertable_column_names[table_name] ||= {}
  for cf in column_families
    column_name = cf.elements['Name'].text
    rubified_name = rubify_column_name(column_name)
    @hypertable_column_names[table_name][rubified_name] = column_name
    columns << new_column(rubified_name, '')
  end

  columns
end

#convert_options_to_scan_spec(options = {}) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 159

def convert_options_to_scan_spec(options={})
  sanitize_conditions(options)

  # Rows can be specified using a number of different options:
  # :row_keys => [row_key_1, row_key_2, ...]
  # :start_row and :end_row
  # :row_intervals => [[start_1, end_1], [start_2, end_2]]
  row_intervals = []

  options[:start_inclusive] = options.has_key?(:start_inclusive) ? options[:start_inclusive] : true
  options[:end_inclusive] = options.has_key?(:end_inclusive) ? options[:end_inclusive] : true

  if options[:row_keys]
    options[:row_keys].flatten.each do |rk|
      row_intervals << [rk, rk]
    end
  elsif options[:row_intervals]
    options[:row_intervals].each do |ri|
      row_intervals << [ri.first, ri.last]
    end
  elsif options[:start_row]
    raise "missing :end_row" if !options[:end_row]
    row_intervals << [options[:start_row], options[:end_row]]
  end

  options[:row_intervals] = row_intervals.map do |row_interval|
    ri = Hypertable::ThriftGen::RowInterval.new
    ri.start_row = row_interval.first
    ri.start_inclusive = options[:start_inclusive]
    ri.end_row = row_interval.last
    ri.end_inclusive = options[:end_inclusive]
    ri
  end

  scan_spec = Hypertable::ThriftGen::ScanSpec.new
  options[:revs] ||= 1
  options[:return_deletes] ||= false

  for key in options.keys
    case key.to_sym
      when :row_intervals
        scan_spec.row_intervals = options[key]
      when :cell_intervals
        scan_spec.cell_intervals = options[key]
      when :start_time
        scan_spec.start_time = options[key]
      when :end_time
        scan_spec.end_time = options[key]
      when :limit
        scan_spec.row_limit = options[key]
      when :revs
        scan_spec.revs = options[key]
      when :return_deletes
        scan_spec.return_deletes = options[key]
      when :select
        # Columns listed here can be column families only (not
        # column qualifiers) at this time.
        requested_columns = options[key].is_a?(String) ? options[key].split(',').map{|s| s.strip} : options[key]
        scan_spec.columns = requested_columns.map do |column|
          status, family, qualifier = is_qualified_column_name?(column)
          family
        end.uniq
      when :table_name, :start_row, :end_row, :start_inclusive, :end_inclusive, :select, :columns, :row_keys, :conditions, :include, :readonly, :scan_spec
        # ignore
      else
        raise "Unrecognized scan spec option: #{key}"
    end
  end

  scan_spec
end

#create_table(table_name, options = {}) ⇒ Object



332
333
334
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 332

def create_table(table_name, options = {})
  execute(create_table_hql(table_name, options))
end

#create_table_hql(table_name, options = {}) {|table_definition| ... } ⇒ Object

Yields:

  • (table_definition)


309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 309

def create_table_hql(table_name, options={})
  table_definition = HyperTableDefinition.new(self)

  yield table_definition

  if options[:force] && table_exists?(table_name)
    drop_table(table_name, options)
  end

  create_sql = [ "CREATE TABLE #{quote_table_name(table_name)} (" ]
  column_sql = []
  for col in table_definition.columns
    column_sql << [
      quote_table_name(col.name),
      col.max_versions ? "MAX_VERSIONS=#{col.max_versions}" : ''
    ].join(' ')
  end
  create_sql << column_sql.join(', ')

  create_sql << ") #{options[:options]}"
  create_sql.join(' ').strip
end

#delete_cells(table_name, cells) ⇒ Object



452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 452

def delete_cells(table_name, cells)
  t1 = Time.now

  retry_on_connection_error {
    @connection.with_mutator(table_name) do |mutator|
      thrift_cells = cells.map{|c|
        cell = thrift_cell_from_native_array(c)
        cell.flag = Hypertable::ThriftGen::CellFlag::DELETE_CELL
        cell
      }
      @connection.set_cells(mutator, thrift_cells)
    end
  }

  @@write_latency += Time.now - t1
end

#delete_rows(table_name, row_keys) ⇒ Object



469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 469

def delete_rows(table_name, row_keys)
  t1 = Time.now
  cells = row_keys.map do |row_key|
    cell = Hypertable::ThriftGen::Cell.new
    cell.row_key = row_key
    cell.flag = Hypertable::ThriftGen::CellFlag::DELETE_ROW
    cell
  end

  retry_on_connection_error {
    @connection.with_mutator(table_name) do |mutator|
      @connection.set_cells(mutator, cells)
    end
  }

  @@write_latency += Time.now - t1
end

#describe_table(table_name) ⇒ Object



399
400
401
402
403
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 399

def describe_table(table_name)
  retry_on_connection_error {
    @connection.get_schema(table_name)
  }
end

#drop_table(table_name, options = {}) ⇒ Object



336
337
338
339
340
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 336

def drop_table(table_name, options = {})
  retry_on_connection_error {
    @connection.drop_table(table_name, options[:if_exists] || false)
  }
end

#each_cell(scanner, &block) ⇒ Object

Iterator methods



529
530
531
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 529

def each_cell(scanner, &block)
  @connection.each_cell(scanner, &block)
end

#each_cell_as_arrays(scanner, &block) ⇒ Object



533
534
535
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 533

def each_cell_as_arrays(scanner, &block)
  @connection.each_cell_as_arrays(scanner, &block)
end

#each_row(scanner, &block) ⇒ Object



537
538
539
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 537

def each_row(scanner, &block)
  @connection.each_row(scanner, &block)
end

#each_row_as_arrays(scanner, &block) ⇒ Object



541
542
543
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 541

def each_row_as_arrays(scanner, &block)
  @connection.each_row_as_arrays(scanner, &block)
end

#execute(hql, name = nil) ⇒ Object



231
232
233
234
235
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 231

def execute(hql, name=nil)
  log(hql, name) {
    retry_on_connection_error { @connection.hql_query(hql) }
  }
end

#execute_with_options(options) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 95

def execute_with_options(options)
  scan_spec = convert_options_to_scan_spec(options)
  t1 = Time.now

  # Use native array method (get_cells_as_arrays) for cell retrieval - 
  # much faster than get_cells that returns Hypertable::ThriftGen::Cell
  # objects.
  # [
  #   ["page_1", "name", "", "LOLcats and more", "1237331693147619001"], 
  #   ["page_1", "url", "", "http://...", "1237331693147619002"]
  # ]
  cells = retry_on_connection_error {
    @connection.get_cells_as_arrays(options[:table_name], scan_spec)
  }

  # Capture performance metrics
  @@read_latency += Time.now - t1
  @@cells_read += cells.length

  cells
end

#flush_mutator(mutator) ⇒ Object



509
510
511
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 509

def flush_mutator(mutator)
  @connection.flush_mutator(mutator)
end

#handle_thrift_exceptions_with_missing_messageObject

Exceptions generated by Thrift IDL do not set a message. This causes a lot of problems for Rails which expects a String value and throws exception when it encounters NilClass. Unfortunately, you cannot assign a message to exceptions so define a singleton to accomplish same goal.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 122

def handle_thrift_exceptions_with_missing_message
  begin
    yield
  rescue Exception => err
    if !err.message
      if err.respond_to?("message=")
        err.message = err.what || ''
      else
        def err.message
          self.what || ''
        end
      end
    end

    raise err
  end
end

#hypertable_column_name(name, table_name, declared_columns_only = false) ⇒ Object



393
394
395
396
397
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 393

def hypertable_column_name(name, table_name, declared_columns_only=false)
  n = @hypertable_column_names[table_name][name]
  n ||= name if !declared_columns_only
  n
end

#insert_fixture(fixture, table_name) ⇒ Object



487
488
489
490
491
492
493
494
495
496
497
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 487

def insert_fixture(fixture, table_name)
  fixture_hash = fixture.to_hash
  timestamp = fixture_hash.delete('timestamp')
  row_key = fixture_hash.delete('ROW')
  cells = []
  fixture_hash.keys.each do |k|
    column_name, column_family = k.split(':', 2)
    cells << cell_native_array(row_key, column_name, column_family, fixture_hash[k], timestamp)
  end
  write_cells(table_name, cells)
end

#is_qualified_column_name?(column_name) ⇒ Boolean

Returns:

  • (Boolean)


290
291
292
293
294
295
296
297
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 290

def is_qualified_column_name?(column_name)
  column_family, qualifier = column_name.split(':', 2)
  if qualifier
    [true, column_family, qualifier]
  else
    [false, column_name, nil]
  end
end

#native_database_typesObject



77
78
79
80
81
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 77

def native_database_types
  {
    :string      => { :name => "varchar", :limit => 255 }
  }
end

#new_column(column_name, default_value = '') ⇒ Object



278
279
280
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 278

def new_column(column_name, default_value='')
  Column.new(rubify_column_name(column_name), default_value)
end

#open_mutator(table_name) ⇒ Object

Mutator methods



501
502
503
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 501

def open_mutator(table_name)
  @connection.open_mutator(table_name)
end

#open_scanner(table_name, scan_spec) ⇒ Object

Scanner methods



515
516
517
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 515

def open_scanner(table_name, scan_spec)
  @connection.open_scanner(table_name, scan_spec, true)
end

#qualified_column_name(column_family, qualifier = nil) ⇒ Object



282
283
284
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 282

def qualified_column_name(column_family, qualifier=nil)
  [column_family, qualifier].compact.join(':')
end

#quote(value, column = nil) ⇒ Object



377
378
379
380
381
382
383
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 377

def quote(value, column = nil)
  case value
    when NilClass then ''
    when String then value
    else super(value, column)
  end
end

#quote_column_name(name) ⇒ Object



385
386
387
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 385

def quote_column_name(name)
  "'#{name}'"
end

#quote_column_name_for_table(name, table_name) ⇒ Object



389
390
391
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 389

def quote_column_name_for_table(name, table_name)
  quote_column_name(hypertable_column_name(name, table_name))
end

#remove_column(table_name, *column_names) ⇒ Object Also known as: remove_columns



370
371
372
373
374
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 370

def remove_column(table_name, *column_names)
  column_names.flatten.each do |column_name|
    execute "ALTER TABLE #{quote_table_name(table_name)} DROP(#{quote_column_name(column_name)})"
  end
end

#remove_column_from_name_map(table_name, name) ⇒ Object



263
264
265
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 263

def remove_column_from_name_map(table_name, name)
  @hypertable_column_names[table_name].delete(rubify_column_name(name))
end

#rename_column(table_name, column_name, new_column_name) ⇒ Object

Schema alterations



301
302
303
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 301

def rename_column(table_name, column_name, new_column_name)
  raise "rename_column operation not supported by Hypertable."
end

#rename_table(table_name, options = {}) ⇒ Object



342
343
344
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 342

def rename_table(table_name, options = {})
  raise "rename_table operation not supported by Hypertable."
end

#retry_on_connection_errorObject

Attempt to reconnect to the Thrift Broker once before aborting. This ensures graceful recovery in the case that the Thrift Broker goes down and then comes back up.



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 143

def retry_on_connection_error
  @retry_on_failure = true
  begin
    handle_thrift_exceptions_with_missing_message { yield }
  rescue Thrift::TransportException, IOError, Thrift::ApplicationException, Thrift::ProtocolException => err
    if @retry_on_failure
      @retry_on_failure = false
      @connection.close
      @connection.open
      retry
    else
      raise err
    end
  end
end

#rubify_column_name(column_name) ⇒ Object



286
287
288
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 286

def rubify_column_name(column_name)
  column_name.to_s.gsub(/-+/, '_')
end

#sanitize_conditions(options) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 83

def sanitize_conditions(options)
  case options[:conditions]
    when Hash
      # requires Hypertable API to support query by arbitrary cell value
      raise "HyperRecord does not support specifying conditions by Hash"
    when NilClass
      # do nothing
    else
      raise "Only hash conditions are supported"
  end
end

#supports_migrations?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 73

def supports_migrations?
  true
end

#tables(name = nil) ⇒ Object



405
406
407
408
409
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 405

def tables(name=nil)
  retry_on_connection_error {
    @connection.get_tables
  }
end

#thrift_cell_from_native_array(array) ⇒ Object

Cell passed in as [row_key, column_name, value] return a Hypertable::ThriftGen::Cell object which is required if the cell requires a flag on write (delete operations)



431
432
433
434
435
436
437
438
439
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 431

def thrift_cell_from_native_array(array)
  cell = Hypertable::ThriftGen::Cell.new
  cell.row_key = array[0]
  cell.column_family = array[1]
  cell.column_qualifier = array[2] if !array[2].blank?
  cell.value = array[3] if array[3]
  cell.timestamp = array[4] if array[4]
  cell
end

#with_scanner(table_name, scan_spec, &block) ⇒ Object



523
524
525
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 523

def with_scanner(table_name, scan_spec, &block)
  @connection.with_scanner(table_name, scan_spec, &block)
end

#with_thrift_clientObject



54
55
56
57
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 54

def with_thrift_client
  @connection.with_thrift_client(@config[:host], config[:port], 
    config[:timeout])
end

#write_cells(table_name, cells, mutator = nil) ⇒ Object



411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File 'lib/active_record/connection_adapters/hypertable_adapter.rb', line 411

def write_cells(table_name, cells, mutator=nil)
  return if cells.blank?

  retry_on_connection_error {
    local_mutator_created = !mutator

    begin
      t1 = Time.now
      mutator ||= @connection.open_mutator(table_name)
      @connection.set_cells_as_arrays(mutator, cells)
    ensure
      @connection.close_mutator(mutator, true) if local_mutator_created
      @@write_latency += Time.now - t1
    end
  }
end