24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/coupler/models/scenario/runner.rb', line 24
def run!
@parent.local_database do |scenario_db|
@groups_dataset = scenario_db[@groups_table_name]
@groups_buffer = ImportBuffer.new(@groups_column_names, @groups_dataset)
@join_dataset = scenario_db[@join_table_name]
@join_buffer = ImportBuffer.new([:record_id, :which, :group_id], @join_dataset)
@pairs = @phase_one_pairs
tw = ThreadsWait.new
databases_to_close = []
@resources.each_with_index do |resource, i|
dataset = resource.final_dataset
databases_to_close << dataset.db
primary_key = resource.primary_key_sym
which = @type == 'self-linkage' ? nil : i
resource_thread = phase_one_thread(dataset, primary_key, which)
tw.join_nowait(resource_thread)
end
tw.all_waits
databases_to_close.each do |db|
db.disconnect
::Sequel::DATABASES.delete(db)
end
if @type != 'self-linkage'
secondary_groups_ds = scenario_db[@secondary_groups_table_name].order(:group_1_id, :group_2_id)
@join_buffer = ImportBuffer.new([:group_1_id, :group_2_id], secondary_groups_ds)
@pairs = @phase_two_pairs
phase_two(@groups_dataset, :id)
@join_buffer.flush
tw = ThreadsWait.new
count = secondary_groups_ds.count
offset = 0
last_group_id = @group_number
while offset < count
dataset = secondary_groups_ds.limit(10, offset)
offset += 10
dataset.each do |row|
thread = Thread.new(row[:group_1_id], row[:group_2_id]) do |group_1_id, group_2_id|
new_group_id = get_next_group_id
@join_dataset.filter(:group_id => [group_1_id, group_2_id]).update(:group_id => new_group_id)
@groups_dataset.filter(:id => group_1_id).update(:id => new_group_id)
@groups_dataset.filter(:id => group_2_id).delete
end
thread.abort_on_exception = true
tw.join_nowait(thread)
end
tw.all_waits
end
@groups_dataset.filter(:id <= last_group_id).delete
@join_dataset.filter(:group_id <= last_group_id).delete
scenario_db.drop_table(@secondary_groups_table_name)
end
@join_dataset.group_and_count(:group_id, :which).each do |row|
col = row[:which] ? :"resource_#{row[:which]+1}_count" : :"resource_1_count"
@groups_dataset.filter(:id => row[:group_id]).update(col => row[:count])
end
end
end
|