Class: ActiveRecord::ConnectionAdapters::HiveAdapter

Inherits:
AbstractAdapter
  • Object
show all
Defined in:
lib/active_record/connection_adapters/hive_adapter.rb

Defined Under Namespace

Modules: TableDefinitionExtensions Classes: BindSubstitution

Constant Summary collapse

NATIVE_DATABASE_TYPES =
{
  :string      => { :name => "string" },
  :text        => { :name => "string" },
  :integer     => { :name => "int" },
  :float       => { :name => "float" },
  :double      => { :name => "double" },
  :datetime    => { :name => "string" },
  :timestamp   => { :name => "string" },
  :time        => { :name => "string" },
  :date        => { :name => "string" },
  :binary      => { :name => "string" },
  :boolean     => { :name => "tinyint" }
}

Instance Method Summary collapse

Constructor Details

#initialize(connection, logger, connection_params, config) ⇒ HiveAdapter

Returns a new instance of HiveAdapter.



84
85
86
87
88
89
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 84

def initialize(connection, logger, connection_params, config)
  super(connection, logger)
  @connection_params = connection_params
  connect
  @visitor = BindSubstitution.new(self)
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


104
105
106
107
108
109
110
111
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 104

def active?
  begin
    @connection.execute("SET check=1")
    true
  rescue
    false
  end
end

#adapter_nameObject

:nodoc:



119
120
121
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 119

def adapter_name #:nodoc:
  'Hive'
end

#add_column(table_name, column_name, type, options = { }) ⇒ Object



222
223
224
225
226
227
228
229
230
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 222

def add_column(table_name, column_name, type, options = { })
  sql = "ALTER TABLE #{quote_table_name(table_name)} ADD COLUMNS (#{quote_column_name(column_name)} #{type_to_sql(type)}"
  o = { }
  o[:default] = options[:default] if options[:default]
  o[:column]  = HiveColumn.new(column_name, nil, type, false)
  add_column_options!(sql, o)
  sql << ")"
  execute(sql)
end

#add_column_options!(sql, options) ⇒ Object

:nodoc:



211
212
213
214
215
216
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 211

def add_column_options!(sql, options) #:nodoc:
  meta = ""
  meta << "ar_type=#{options[:column].type}"
  meta << ",ar_default=#{options[:default]}" if options[:default]
  sql << " COMMENT '#{meta}'"
end

#add_index(table_name, column_name, options = { }) ⇒ Object

Raises:

  • (NotImplementedError)


218
219
220
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 218

def add_index(table_name, column_name, options = { })
  raise NotImplementedError
end

#columns(table, name = nil) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 171

def columns(table, name=nil)
  res = query("DESCRIBE FORMATTED #{quote_table_name(table)}", name)

  table_info_index = res.find_index { |ln| ln.start_with?("# Detailed Table Information") }
  begin_partition = false
  columns = []
  res.slice(0, table_info_index - 1).each do |ln|
    if ln.start_with?("# Partition Information")
      begin_partition = true
      next
    end
    next if ln.strip.empty?
    next if ln.start_with?("# col_name")
    col_name, sql_type, comment = ln.split(/\s+/)
    meta = Hash[comment.to_s.split(',').map { |meta| property, value = meta.split('=') }]
    type = meta['ar_type'] || sql_type
    columns << HiveColumn.new(col_name, meta['ar_default'], type, begin_partition)
  end
  columns
end

#connectObject



91
92
93
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 91

def connect
  @connection = ActiveRecordHiveAdapter::HiveConnector.new(*@connection_params)
end

#create_table(table_name, options = { }) {|table_definition| ... } ⇒ Object

Yields:

  • (table_definition)


192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 192

def create_table(table_name, options={ })
  table_definition = TableDefinition.new(self)
  table_definition.extend(TableDefinitionExtensions)

  yield table_definition if block_given?

  if options[:force] && table_exists?(table_name)
    drop_table(table_name, options)
  end

  create_sql = "CREATE#{' EXTERNAL' if table_definition.external} TABLE "
  create_sql << "#{quote_table_name(table_name)} ("
  create_sql << table_definition.to_sql
  create_sql << ") "
  create_sql << "#{partitioned_by(table_definition.partitions)} "
  create_sql << table_definition.row_format
  execute create_sql
end

#database_nameObject



159
160
161
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 159

def database_name
  @connection.database
end

#disconnectObject



95
96
97
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 95

def disconnect
  @connection.close
end

#execute(sql, name = nil) ⇒ Object



113
114
115
116
117
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 113

def execute(sql, name=nil)
  with_auto_reconnect do
    log(sql, name) { @connection.execute(sql) }
  end
end

#native_database_typesObject



131
132
133
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 131

def native_database_types
  NATIVE_DATABASE_TYPES
end

#partitioned_by(partitions) ⇒ Object



232
233
234
235
236
237
238
239
240
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 232

def partitioned_by(partitions)
  unless partitions.to_a.empty?
    spec = "PARTITIONED BY ("
    spec << partitions.map do |p|
      options = { :default => p.default, :column => p }
      add_column_options!("#{p.name} #{p.sql_type}", options)
    end.join(", ") << ")"
  end
end

#primary_key(table_name) ⇒ Object



167
168
169
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 167

def primary_key(table_name)
  nil
end

#query(sql, name = nil) ⇒ Object



135
136
137
138
139
140
141
142
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 135

def query(sql, name=nil)
  with_auto_reconnect do
    log(sql, name) do
      @connection.execute(sql)
      @connection.fetch_all
    end
  end
end

#reconnect!Object



99
100
101
102
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 99

def reconnect!
  disconnect
  connect
end

#select(sql, name = nil, binds = []) ⇒ Object



144
145
146
147
148
149
150
151
152
153
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 144

def select(sql, name=nil, binds=[])
  with_auto_reconnect do
    log(sql, name) do
      @connection.execute(sql)
      fields = @connection.get_schema.fieldSchemas.map { |f| f.name }
      res = @connection.fetch_all
      res.map { |row| Hash[*fields.zip(row.split("\t")).flatten] }
    end
  end
end

#select_rows(sql, name = nil) ⇒ Object



155
156
157
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 155

def select_rows(sql, name=nil)
  query(sql, name)
end

#supports_migrations?Boolean

:nodoc:

Returns:

  • (Boolean)


123
124
125
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 123

def supports_migrations? #:nodoc:
  true
end

#supports_primary_key?Boolean

:nodoc:

Returns:

  • (Boolean)


127
128
129
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 127

def supports_primary_key? #:nodoc:
  false
end

#tables(name = nil) ⇒ Object



163
164
165
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 163

def tables(name=nil)
  query("SHOW TABLES", name)
end

#with_auto_reconnectObject



242
243
244
245
246
247
248
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 242

def with_auto_reconnect
  yield
rescue Thrift::TransportException => e
  raise unless e.message == "end of file reached"
  reconnect!
  yield
end