Class: Sq::Dbsync::IncrementalLoadAction
Overview
Load action to incrementally keep a table up-to-date by loading deltas from the source system. Note that this technique is unable by itself to detect deletes, but behaviour can be added to delete records based on a separate audit log. See documentation for more details.
Constant Summary
Constants inherited
from LoadAction
LoadAction::EPOCH
Instance Method Summary
collapse
Methods inherited from LoadAction
#call, #do_prepare, #initialize, stages, #tag
Instance Method Details
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/sq/dbsync/incremental_load_action.rb', line 32
def
@metadata = registry.get(plan.table_name)
@start_time = now.call
since = (
@metadata[:last_row_at] ||
@metadata[:last_synced_at]
) - overlap
@file, @last_row_at = measure(:extract) { (since) }
self
end
|
#filter_columns ⇒ Object
73
74
75
76
77
78
|
# File 'lib/sq/dbsync/incremental_load_action.rb', line 73
def filter_columns
source = plan.source_db
source_columns = source.hash_schema(plan.source_table_name).keys
plan.columns = resolve_columns(plan, source_columns) &
(target_columns || source_columns)
end
|
#load_data ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/sq/dbsync/incremental_load_action.rb', line 44
def load_data
measure(:load) do
db.transaction do
db.load_incrementally_from_file(
plan.prefixed_table_name,
plan.columns,
@file.path
)
process_deletes
registry.update(plan.table_name, @metadata[:last_batch_synced_at],
last_synced_at: @start_time,
last_row_at: @last_row_at
)
end
@file.close!
end
self
end
|
#operation ⇒ Object
10
|
# File 'lib/sq/dbsync/incremental_load_action.rb', line 10
def operation; 'increment'; end
|
#post_load ⇒ Object
65
66
67
|
# File 'lib/sq/dbsync/incremental_load_action.rb', line 65
def post_load
self
end
|
#prefix ⇒ Object
69
70
71
|
# File 'lib/sq/dbsync/incremental_load_action.rb', line 69
def prefix
''
end
|
#prepare ⇒ Object
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# File 'lib/sq/dbsync/incremental_load_action.rb', line 12
def prepare
if super
if plan.always_sync
registry.set(plan.table_name,
last_synced_at: EPOCH,
last_batch_synced_at: EPOCH,
last_row_at: nil
)
end
!!registry.get(plan.table_name)
else
if plan.always_sync
registry.delete(plan.table_name)
target.drop_table(plan.table_name)
end
false
end
end
|
#process_deletes ⇒ Object
91
92
93
|
# File 'lib/sq/dbsync/incremental_load_action.rb', line 91
def process_deletes
end
|
#target_columns ⇒ Object
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/sq/dbsync/incremental_load_action.rb', line 80
def target_columns
target_columns = if target.table_exists?(plan.table_name)
tname = "#{prefix}#{plan.table_name}"
target.hash_schema(tname).keys
else
nil
end
end
|