Class: Blazer::Adapters::SqlAdapter
- Inherits:
-
BaseAdapter
- Object
- BaseAdapter
- Blazer::Adapters::SqlAdapter
- Defined in:
- lib/blazer/adapters/sql_adapter.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#connection_model ⇒ Object
readonly
Returns the value of attribute connection_model.
Attributes inherited from BaseAdapter
Instance Method Summary collapse
- #cachable?(statement) ⇒ Boolean
- #cancel(run_id) ⇒ Object
- #cost(statement) ⇒ Object
- #explain(statement) ⇒ Object
-
#initialize(data_source) ⇒ SqlAdapter
constructor
A new instance of SqlAdapter.
- #preview_statement ⇒ Object
- #reconnect ⇒ Object
- #run_statement(statement, comment) ⇒ Object
- #schema ⇒ Object
- #tables ⇒ Object
Constructor Details
#initialize(data_source) ⇒ SqlAdapter
Returns a new instance of SqlAdapter.
6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 6 def initialize(data_source) super @connection_model = Class.new(Blazer::Connection) do def self.name "Blazer::Connection::Adapter#{object_id}" end establish_connection(data_source.settings["url"]) if data_source.settings["url"] end end |
Instance Attribute Details
#connection_model ⇒ Object (readonly)
Returns the value of attribute connection_model.
4 5 6 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 4 def connection_model @connection_model end |
Instance Method Details
#cachable?(statement) ⇒ Boolean
110 111 112 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 110 def cachable?(statement) !%w[CREATE ALTER UPDATE INSERT DELETE].include?(statement.split.first.to_s.upcase) end |
#cancel(run_id) ⇒ Object
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 99 def cancel(run_id) if postgresql? select_all("SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND query LIKE '%,run_id:#{run_id}%'") elsif redshift? first_row = select_all("SELECT pid FROM stv_recents WHERE status = 'Running' AND query LIKE '%,run_id:#{run_id}%'").first if first_row select_all("CANCEL #{first_row["pid"].to_i}") end end end |
#cost(statement) ⇒ Object
73 74 75 76 77 78 79 80 81 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 73 def cost(statement) result = explain(statement) if sqlserver? result["TotalSubtreeCost"] else match = /cost=\d+\.\d+..(\d+\.\d+) /.match(result) match[1] if match end end |
#explain(statement) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 83 def explain(statement) if postgresql? || redshift? select_all("EXPLAIN #{statement}").rows.first.first elsif sqlserver? begin execute("SET SHOWPLAN_ALL ON") result = select_all(statement).each.first ensure execute("SET SHOWPLAN_ALL OFF") end result end rescue nil end |
#preview_statement ⇒ Object
59 60 61 62 63 64 65 66 67 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 59 def preview_statement if postgresql? "SELECT * FROM \"{table}\" LIMIT 30" elsif sqlserver? "SELECT TOP (30) * FROM {table}" else "SELECT *\nFROM {table}\nORDER BY {table}.id DESC\nLIMIT 30" end end |
#reconnect ⇒ Object
69 70 71 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 69 def reconnect connection_model.establish_connection(settings["url"]) end |
#run_statement(statement, comment) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 18 def run_statement(statement, comment) columns = [] rows = [] error = nil begin connection_model.connection rescue ActiveRecord::ConnectionNotEstablished reconnect end begin in_transaction do set_timeout(data_source.timeout) if data_source.timeout result = select_all("#{statement} /*#{comment}*/") columns = result.columns cast_method = Rails::VERSION::MAJOR < 5 ? :type_cast : :cast_value result.rows.each do |untyped_row| rows << (result.column_types.empty? ? untyped_row : columns.each_with_index.map { |c, i| untyped_row[i] ? result.column_types[c].send(cast_method, untyped_row[i]) : untyped_row[i] }) end end rescue => e error = e..sub(/.+ERROR: /, "") error = Blazer::TIMEOUT_MESSAGE if Blazer::TIMEOUT_ERRORS.any? { |e| error.include?(e) } reconnect if error.include?("PG::ConnectionBad") end [columns, rows, error] end |
#schema ⇒ Object
54 55 56 57 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 54 def schema result = data_source.run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_schema, table_name, column_name, data_type, ordinal_position FROM information_schema.columns WHERE table_schema IN (?) ORDER BY 1, 2", schemas])) result.rows.group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} } end |
#tables ⇒ Object
49 50 51 52 |
# File 'lib/blazer/adapters/sql_adapter.rb', line 49 def tables result = data_source.run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name FROM information_schema.tables WHERE table_schema IN (?) ORDER BY table_name", schemas]), refresh_cache: true) result.rows.map(&:first) end |