Class: Sq::Dbsync::Database::Mysql
- Inherits:
-
Delegator
- Object
- Delegator
- Sq::Dbsync::Database::Mysql
show all
- Includes:
- Common
- Defined in:
- lib/sq/dbsync/database/mysql.rb
Overview
Decorator around a Sequel database object, providing some non-standard extensions required for effective ETL with MySQL.
Constant Summary
collapse
- AUX_TIME_BUFFER =
2 days is chosen as an arbitrary buffer
60 * 60 * 24 * 2
Constants included
from Common
Common::SQD
Instance Method Summary
collapse
-
#consistency_check(table_name, t) ⇒ Object
-
#delete_recent(plan, since) ⇒ Object
Deletes recent rows based on timestamp, but also allows filtering by an auxilary timestamp column for the case where the primary one is not indexed on the target (such as the DFR reports, where imported_at is not indexed, but reporting date is).
-
#drop_table(table_name) ⇒ Object
-
#initialize(db) ⇒ Mysql
constructor
-
#inspect ⇒ Object
-
#load_from_file(table_name, columns, file_name) ⇒ Object
-
#load_incrementally_from_file(table_name, columns, file_name) ⇒ Object
-
#set_lock_timeout(seconds) ⇒ Object
-
#switch_table(to_replace, new_table) ⇒ Object
-
#table_exists?(table_name) ⇒ Boolean
Overriden because the Sequel implementation does not work with partial permissions on a table.
Methods included from Common
#__getobj__, #__setobj__, #ensure_connection, #extract_incrementally_to_file, #extract_to_file, #hash_schema, #name
Constructor Details
#initialize(db) ⇒ Mysql
Returns a new instance of Mysql.
19
20
21
22
|
# File 'lib/sq/dbsync/database/mysql.rb', line 19
def initialize(db)
super
@db = db
end
|
Instance Method Details
#consistency_check(table_name, t) ⇒ Object
83
84
85
86
87
88
|
# File 'lib/sq/dbsync/database/mysql.rb', line 83
def consistency_check(table_name, t)
ensure_connection
db[table_name].
filter("created_at BETWEEN ? AND ?", t - 60*60, t).
count
end
|
#delete_recent(plan, since) ⇒ Object
Deletes recent rows based on timestamp, but also allows filtering by an auxilary timestamp column for the case where the primary one is not indexed on the target (such as the DFR reports, where imported_at is not indexed, but reporting date is).
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/sq/dbsync/database/mysql.rb', line 68
def delete_recent(plan, since)
ensure_connection
query = db[plan.table_name].
filter("#{plan.timestamp} > ?", since)
if plan.aux_timestamp_column
query = query.filter(
"#{plan.aux_timestamp_column} > ?",
since - AUX_TIME_BUFFER
)
end
query.delete
end
|
#drop_table(table_name) ⇒ Object
101
102
103
|
# File 'lib/sq/dbsync/database/mysql.rb', line 101
def drop_table(table_name)
db.drop_table(table_name)
end
|
#inspect ⇒ Object
24
|
# File 'lib/sq/dbsync/database/mysql.rb', line 24
def inspect; "#<Database::Mysql #{opts[:database]}>"; end
|
#load_from_file(table_name, columns, file_name) ⇒ Object
26
27
28
29
30
31
32
33
34
|
# File 'lib/sq/dbsync/database/mysql.rb', line 26
def load_from_file(table_name, columns, file_name)
ensure_connection
sql = "LOAD DATA INFILE '%s' IGNORE INTO TABLE %s (%s)" % [
file_name,
table_name,
escape_columns(columns)
]
db.run sql
end
|
#load_incrementally_from_file(table_name, columns, file_name) ⇒ Object
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/sq/dbsync/database/mysql.rb', line 40
def load_incrementally_from_file(table_name, columns, file_name)
ensure_connection
set_lock_timeout(10)
db.run "LOAD DATA INFILE '%s' REPLACE INTO TABLE %s (%s)" % [
file_name,
table_name,
escape_columns(columns)
]
rescue Sequel::DatabaseError => e
transient_regex =
/Lock wait timeout exceeded|Deadlock found when trying to get lock/
if e.message =~ transient_regex
raise TransientError, e.message, e.backtrace
else
raise
end
end
|
#set_lock_timeout(seconds) ⇒ Object
36
37
38
|
# File 'lib/sq/dbsync/database/mysql.rb', line 36
def set_lock_timeout(seconds)
db.run lock_timeout_sql(seconds)
end
|
#switch_table(to_replace, new_table) ⇒ Object
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/sq/dbsync/database/mysql.rb', line 105
def switch_table(to_replace, new_table)
ensure_connection
to_replace = to_replace.to_s
renames = []
drops = []
if table_exists?(to_replace)
renames << [to_replace, 'old_' + to_replace]
drops << 'old_' + to_replace
end
renames << [new_table, to_replace]
db.run <<-SQL
RENAME TABLE #{renames.map {|tables| "%s TO %s" % tables }.join(', ')}
SQL
drops.each { |table| drop_table(table) }
end
|
#table_exists?(table_name) ⇒ Boolean
93
94
95
96
97
98
99
|
# File 'lib/sq/dbsync/database/mysql.rb', line 93
def table_exists?(table_name)
begin
!!db.schema(table_name, reload: true)
rescue Sequel::DatabaseError
false
end
end
|