Class: Blazer::Adapters::SqlAdapter

Inherits:
BaseAdapter show all
Defined in:
lib/blazer/adapters/sql_adapter.rb

Direct Known Subclasses

SnowflakeAdapter

Instance Attribute Summary collapse

Attributes inherited from BaseAdapter

#data_source

Instance Method Summary collapse

Constructor Details

#initialize(data_source) ⇒ SqlAdapter

Returns a new instance of SqlAdapter.



6
7
8
9
10
11
12
13
14
15
16
# File 'lib/blazer/adapters/sql_adapter.rb', line 6

def initialize(data_source)
  super

  @connection_model =
    Class.new(Blazer::Connection) do
      def self.name
        "Blazer::Connection::Adapter#{object_id}"
      end
      establish_connection(data_source.settings["url"]) if data_source.settings["url"]
    end
end

Instance Attribute Details

#connection_modelObject (readonly)

Returns the value of attribute connection_model.



4
5
6
# File 'lib/blazer/adapters/sql_adapter.rb', line 4

def connection_model
  @connection_model
end

Instance Method Details

#cachable?(statement) ⇒ Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/blazer/adapters/sql_adapter.rb', line 110

def cachable?(statement)
  !%w[CREATE ALTER UPDATE INSERT DELETE].include?(statement.split.first.to_s.upcase)
end

#cancel(run_id) ⇒ Object



99
100
101
102
103
104
105
106
107
108
# File 'lib/blazer/adapters/sql_adapter.rb', line 99

def cancel(run_id)
  if postgresql?
    select_all("SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND query LIKE '%,run_id:#{run_id}%'")
  elsif redshift?
    first_row = select_all("SELECT pid FROM stv_recents WHERE status = 'Running' AND query LIKE '%,run_id:#{run_id}%'").first
    if first_row
      select_all("CANCEL #{first_row["pid"].to_i}")
    end
  end
end

#cost(statement) ⇒ Object



73
74
75
76
77
78
79
80
81
# File 'lib/blazer/adapters/sql_adapter.rb', line 73

def cost(statement)
  result = explain(statement)
  if sqlserver?
    result["TotalSubtreeCost"]
  else
    match = /cost=\d+\.\d+..(\d+\.\d+) /.match(result)
    match[1] if match
  end
end

#explain(statement) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/blazer/adapters/sql_adapter.rb', line 83

def explain(statement)
  if postgresql? || redshift?
    select_all("EXPLAIN #{statement}").rows.first.first
  elsif sqlserver?
    begin
      execute("SET SHOWPLAN_ALL ON")
      result = select_all(statement).each.first
    ensure
      execute("SET SHOWPLAN_ALL OFF")
    end
    result
  end
rescue
  nil
end

#preview_statementObject



59
60
61
62
63
64
65
66
67
# File 'lib/blazer/adapters/sql_adapter.rb', line 59

def preview_statement
  if postgresql?
    "SELECT * FROM \"{table}\" LIMIT 30"
  elsif sqlserver?
    "SELECT TOP (30) * FROM {table}"
  else
    "SELECT *\nFROM {table}\nORDER BY {table}.id DESC\nLIMIT 30"
  end
end

#reconnectObject



69
70
71
# File 'lib/blazer/adapters/sql_adapter.rb', line 69

def reconnect
  connection_model.establish_connection(settings["url"])
end

#run_statement(statement, comment) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/blazer/adapters/sql_adapter.rb', line 18

def run_statement(statement, comment)
  columns = []
  rows = []
  error = nil

  begin
    connection_model.connection
  rescue ActiveRecord::ConnectionNotEstablished
    reconnect
  end

  begin
    in_transaction do
      set_timeout(data_source.timeout) if data_source.timeout

      result = select_all("#{statement} /*#{comment}*/")
      columns = result.columns
      cast_method = Rails::VERSION::MAJOR < 5 ? :type_cast : :cast_value
      result.rows.each do |untyped_row|
        rows << (result.column_types.empty? ? untyped_row : columns.each_with_index.map { |c, i| untyped_row[i] ? result.column_types[c].send(cast_method, untyped_row[i]) : untyped_row[i] })
      end
    end
  rescue => e
    error = e.message.sub(/.+ERROR: /, "")
    error = Blazer::TIMEOUT_MESSAGE if Blazer::TIMEOUT_ERRORS.any? { |e| error.include?(e) }
    reconnect if error.include?("PG::ConnectionBad")
  end

  [columns, rows, error]
end

#schemaObject



54
55
56
57
# File 'lib/blazer/adapters/sql_adapter.rb', line 54

def schema
  result = data_source.run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_schema, table_name, column_name, data_type, ordinal_position FROM information_schema.columns WHERE table_schema IN (?) ORDER BY 1, 2", schemas]))
  result.rows.group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }
end

#tablesObject



49
50
51
52
# File 'lib/blazer/adapters/sql_adapter.rb', line 49

def tables
  result = data_source.run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name FROM information_schema.tables WHERE table_schema IN (?) ORDER BY table_name", schemas]), refresh_cache: true)
  result.rows.map(&:first)
end