Class: Flydata::Command::Sync
- Inherits:
-
Base
- Object
- Base
- Flydata::Command::Sync
show all
- Includes:
- Helpers
- Defined in:
- lib/flydata/command/sync.rb
Constant Summary
collapse
- CREATE_TABLE_OPTION =
!!(ENV['FLYDATA_CREATE_TABLE_OPTION']) || false
- INSERT_PROGRESS_INTERVAL =
1000
- STATUS_PARSING =
'PARSING'
- STATUS_COMPLETE =
'COMPLETE'
Instance Attribute Summary
Attributes inherited from Base
#opts
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Helpers
development?, env_mode, env_suffix, flydata_api_host_file, flydata_conf_file, format_menu_list, parse_command, retry_on, to_command_class, usage_text
Methods inherited from Base
#ask_input_table_name, #ask_yes_no, #choose_one, #flydata, #initialize, #newline, #register_crontab, #retrieve_data_entries, #separator
Class Method Details
.slop ⇒ Object
21
22
23
24
25
|
# File 'lib/flydata/command/sync.rb', line 21
def self.slop
Slop.new do
on 'c', 'skip-cleanup', 'Skip server cleanup'
end
end
|
.slop_generate_table_ddl ⇒ Object
134
135
136
137
138
|
# File 'lib/flydata/command/sync.rb', line 134
def self.slop_generate_table_ddl
Slop.new do
on 'c', 'ctl-only', 'Only generate FlyData Control definitions'
end
end
|
.slop_reset ⇒ Object
50
51
52
53
54
|
# File 'lib/flydata/command/sync.rb', line 50
def self.slop_reset
Slop.new do
on 'c', 'client', 'Resets client only.'
end
end
|
Instance Method Details
#check ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/flydata/command/sync.rb', line 111
def check
de = retrieve_data_entry
retry_on(RestClient::Exception) do
status = do_check(de)
if status['complete']
nil
else
status
end
end
end
|
#flush ⇒ Object
45
46
47
48
|
# File 'lib/flydata/command/sync.rb', line 45
def flush
flush_buffer_and_stop
puts "Buffers have been flushed and the sender process has been stopped."
end
|
#generate_table_ddl(*tables) ⇒ Object
140
141
142
143
144
|
# File 'lib/flydata/command/sync.rb', line 140
def generate_table_ddl(*tables)
de = retrieve_data_entry
Flydata::Mysql::CompatibilityCheck.new(de['mysql_data_entry_preference']).check
do_generate_table_ddl(override_tables(de, tables))
end
|
#reset(*tables) ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/flydata/command/sync.rb', line 56
def reset(*tables)
msg = tables.empty? ? '' : " for these tables : #{tables.join(" ")}"
return unless ask_yes_no("This resets the current sync#{msg}. Are you sure?")
sender = Flydata::Command::Sender.new
sender.flush_client_buffer sender.stop
de = retrieve_data_entry
wait_for_server_buffer
cleanup_sync_server(de, tables) unless opts.client?
sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
delete_files = [
sync_fm.dump_file_path,
sync_fm.dump_pos_path,
sync_fm.mysql_table_marshal_dump_path,
sync_fm.sync_info_file,
sync_fm.table_position_file_paths(*tables),
sync_fm.table_rev_file_paths(*tables)
]
delete_files << sync_fm.binlog_path if tables.empty?
delete_files.flatten.each do |path|
FileUtils.rm(path) if File.exists?(path)
end
end
|
#run(*tables) ⇒ Object
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/flydata/command/sync.rb', line 27
def run(*tables)
sender = Flydata::Command::Sender.new
if (sender.process_exist?)
if tables.empty?
puts "FlyData Agent is already running. If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first."
else
puts "Flydata Agent is already running. If you'd like to Sync the table(s), run 'flydata sync:flush' first."
end
exit 1
end
de = retrieve_data_entry
de = load_sync_info(override_tables(de, tables))
flush_buffer_and_stop unless de['mysql_data_entry_preference']['initial_sync']
sync_mysql_to_redshift(de)
end
|
#skip ⇒ Object
124
125
126
127
128
129
130
131
132
|
# File 'lib/flydata/command/sync.rb', line 124
def skip
de = retrieve_data_entry
sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
binlog_path = sync_fm.binlog_path
`touch #{binlog_path}`
puts "Created an empty binlog position file."
puts "-> #{binlog_path}"
puts "Run 'flydata start' to start continuous sync."
end
|
#wait_for_server_buffer ⇒ Object
81
82
83
84
85
86
87
|
# File 'lib/flydata/command/sync.rb', line 81
def wait_for_server_buffer
puts "Waiting for the server buffer to get empty"
while (status = check) && (status['state'] == 'processing')
print_progress(status)
sleep 10
end
end
|
#wait_for_server_data_processing ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/flydata/command/sync.rb', line 89
def wait_for_server_data_processing
state = :PROCESS
puts "Uploading data to Redshift..."
sleep 10
status = nil
while (status = check)
if state == :PROCESS && status['state'] == 'uploading'
puts " -> Done"
state = :UPLOAD
puts "Finishing data upload..."
end
print_progress(status)
sleep 10
end
if (state == :PROCESS)
puts " -> Done"
puts "Finishing data upload..."
end
puts " -> Done"
end
|