Class: Sq::Dbsync::Database::Mysql

Inherits:
Delegator
  • Object
show all
Includes:
Common
Defined in:
lib/sq/dbsync/database/mysql.rb

Overview

Decorator around a Sequel database object, providing some non-standard extensions required for effective ETL with MySQL.

Constant Summary collapse

AUX_TIME_BUFFER =

2 days is chosen as an arbitrary buffer

60 * 60 * 24 * 2

Constants included from Common

Common::SQD

Instance Method Summary collapse

Methods included from Common

#__getobj__, #__setobj__, #ensure_connection, #extract_incrementally_to_file, #extract_to_file, #hash_schema, #name

Constructor Details

#initialize(db) ⇒ Mysql

Returns a new instance of Mysql.



19
20
21
22
# File 'lib/sq/dbsync/database/mysql.rb', line 19

def initialize(db)
  super
  @db = db
end

Instance Method Details

#consistency_check(table_name, t) ⇒ Object



83
84
85
86
87
88
# File 'lib/sq/dbsync/database/mysql.rb', line 83

def consistency_check(table_name, t)
  ensure_connection
  db[table_name].
    filter("created_at BETWEEN ? AND ?", t - 60*60, t).
    count
end

#delete_recent(plan, since) ⇒ Object

Deletes recent rows based on timestamp, but also allows filtering by an auxilary timestamp column for the case where the primary one is not indexed on the target (such as the DFR reports, where imported_at is not indexed, but reporting date is).



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/sq/dbsync/database/mysql.rb', line 68

def delete_recent(plan, since)
  ensure_connection
  query = db[plan.table_name].
    filter("#{plan.timestamp} > ?", since)

  if plan.aux_timestamp_column
    query = query.filter(
      "#{plan.aux_timestamp_column} > ?",
      since - AUX_TIME_BUFFER
    )
  end

  query.delete
end

#drop_table(table_name) ⇒ Object



101
102
103
# File 'lib/sq/dbsync/database/mysql.rb', line 101

def drop_table(table_name)
  db.drop_table(table_name)
end

#inspectObject



24
# File 'lib/sq/dbsync/database/mysql.rb', line 24

def inspect; "#<Database::Mysql #{opts[:database]}>"; end

#load_from_file(table_name, columns, file_name) ⇒ Object



26
27
28
29
30
31
32
33
34
# File 'lib/sq/dbsync/database/mysql.rb', line 26

def load_from_file(table_name, columns, file_name)
  ensure_connection
  sql = "LOAD DATA INFILE '%s' IGNORE INTO TABLE %s (%s)" % [
    file_name,
    table_name,
    escape_columns(columns)
  ]
  db.run sql
end

#load_incrementally_from_file(table_name, columns, file_name) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/sq/dbsync/database/mysql.rb', line 40

def load_incrementally_from_file(table_name, columns, file_name)
  ensure_connection
  # Very low lock wait timeout, since we don't want loads to be blocked
  # waiting for long queries.
  set_lock_timeout(10)
  db.run "LOAD DATA INFILE '%s' REPLACE INTO TABLE %s (%s)" % [
    file_name,
    table_name,
    escape_columns(columns)
  ]
rescue Sequel::DatabaseError => e
  transient_regex =
    /Lock wait timeout exceeded|Deadlock found when trying to get lock/

  if e.message =~ transient_regex
    raise TransientError, e.message, e.backtrace
  else
    raise
  end
end

#set_lock_timeout(seconds) ⇒ Object



36
37
38
# File 'lib/sq/dbsync/database/mysql.rb', line 36

def set_lock_timeout(seconds)
  db.run lock_timeout_sql(seconds)
end

#switch_table(to_replace, new_table) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/sq/dbsync/database/mysql.rb', line 105

def switch_table(to_replace, new_table)
  ensure_connection

  to_replace = to_replace.to_s

  renames = []
  drops   = []

  if table_exists?(to_replace)
    renames << [to_replace, 'old_' + to_replace]
    drops << 'old_' + to_replace
  end
  renames << [new_table, to_replace]

  db.run <<-SQL
    RENAME TABLE #{renames.map {|tables| "%s TO %s" % tables }.join(', ')}
  SQL

  drops.each { |table| drop_table(table) }
end

#table_exists?(table_name) ⇒ Boolean

Overriden because the Sequel implementation does not work with partial permissions on a table. See: github.com/jeremyevans/sequel/issues/422

Returns:

  • (Boolean)


93
94
95
96
97
98
99
# File 'lib/sq/dbsync/database/mysql.rb', line 93

def table_exists?(table_name)
  begin
    !!db.schema(table_name, reload: true)
  rescue Sequel::DatabaseError
    false
  end
end