Class: Flydata::Command::Sync
- Inherits:
-
Base
- Object
- Base
- Flydata::Command::Sync
show all
- Includes:
- Helpers
- Defined in:
- lib/flydata/command/sync.rb
Constant Summary
collapse
- INSERT_PROGRESS_INTERVAL =
1000
- STATUS_PARSING =
'PARSING'
- STATUS_WAITING =
'WAITING'
- STATUS_COMPLETE =
'COMPLETE'
Constants included
from Helpers
Helpers::UNIT_PREFIX
Instance Attribute Summary
Attributes inherited from Base
#opts
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Helpers
as_size, development?, env_mode, env_suffix, flydata_api_host_file, flydata_conf_file, flydata_version, format_menu_list, parse_command, retry_on, to_command_class, usage_text
Methods inherited from Base
#ask_input_table_name, #ask_yes_no, #choose_one, #flydata, #initialize, #newline, #register_crontab, #retrieve_data_entries, #separator
#before_logging, #log_error_stderr, #log_info_stdout, #log_warn_stderr
Class Method Details
.slop ⇒ Object
24
25
26
27
28
29
30
31
32
|
# File 'lib/flydata/command/sync.rb', line 24
def self.slop
Slop.new do
on 'c', 'skip-cleanup', 'Skip server cleanup'
on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.'
on 'd', 'dump-file', 'Dump mysqldump into a file. Use this for debugging after making sure the free space.' on 's', 'dump-stream', 'Dump mysqldump stream instead of saving dump file. It might cause timeout error if db size is larger than 10GB.'
on 'n', 'no-flydata-start', 'Don\'t start the flydata agent after initial sync.'
end
end
|
.slop_flush ⇒ Object
54
55
56
57
58
|
# File 'lib/flydata/command/sync.rb', line 54
def self.slop_flush
Slop.new do
on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.'
end
end
|
.slop_generate_table_ddl ⇒ Object
153
154
155
156
157
158
159
|
# File 'lib/flydata/command/sync.rb', line 153
def self.slop_generate_table_ddl
Slop.new do
on 'c', 'ctl-only', 'Only generate FlyData Control definitions'
on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.'
on 's', 'skip-primary-key-check', 'Skip primary key check when generating DDL'
end
end
|
.slop_reset ⇒ Object
65
66
67
68
69
70
|
# File 'lib/flydata/command/sync.rb', line 65
def self.slop_reset
Slop.new do
on 'c', 'client', 'Resets client only.'
on 'y', 'yes', 'Skip command prompt assuming yes to all questions. Use this for batch operation.'
end
end
|
Instance Method Details
#check ⇒ Object
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/flydata/command/sync.rb', line 129
def check
de = retrieve_data_entry
retry_on(RestClient::Exception) do
status = do_check(de)
if status['complete']
nil
else
status
end
end
end
|
#flush ⇒ Object
60
61
62
63
|
# File 'lib/flydata/command/sync.rb', line 60
def flush
flush_buffer_and_stop
log_info_stdout("Buffers have been flushed and the sender process has been stopped.")
end
|
#generate_table_ddl(*tables) ⇒ Object
161
162
163
164
165
166
|
# File 'lib/flydata/command/sync.rb', line 161
def generate_table_ddl(*tables)
de = retrieve_data_entry
dp = flydata.data_port.get
Flydata::MysqlCompatibilityCheck.new(dp, de['mysql_data_entry_preference']).check
do_generate_table_ddl(override_tables(de, tables))
end
|
#reset(*tables) ⇒ Object
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/flydata/command/sync.rb', line 72
def reset(*tables)
msg = tables.empty? ? '' : " for these tables : #{tables.join(" ")}"
return unless ask_yes_no("This resets the current sync#{msg}. Are you sure?")
sender = Flydata::Command::Sender.new
sender.flush_client_buffer sender.stop
de = retrieve_data_entry
wait_for_server_buffer
cleanup_sync_server(de, tables) unless opts.client?
sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
delete_files = [
sync_fm.dump_file_path,
sync_fm.dump_pos_path,
sync_fm.mysql_table_marshal_dump_path,
sync_fm.sync_info_file,
sync_fm.table_position_file_paths(*tables),
sync_fm.table_rev_file_paths(*tables)
]
delete_files << sync_fm.binlog_path if tables.empty?
delete_files.flatten.each do |path|
FileUtils.rm(path) if File.exists?(path)
end
sync_fm.close
log_info_stdout("Reset completed successfully.")
end
|
#run(*tables) ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/flydata/command/sync.rb', line 34
def run(*tables)
sender = Flydata::Command::Sender.new
if (sender.process_exist?)
if tables.empty?
log_warn_stderr("FlyData Agent is already running. If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first.")
else
log_warn_stderr("Flydata Agent is already running. If you'd like to Sync the table(s), run 'flydata sync:flush' first.")
end
exit 1
end
de = retrieve_data_entry
de = load_sync_info(override_tables(de, tables))
validate_initial_sync_status(de, tables)
flush_buffer_and_stop unless de['mysql_data_entry_preference']['initial_sync']
sync_mysql_to_redshift(de)
end
|
#skip ⇒ Object
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/flydata/command/sync.rb', line 142
def skip
de = retrieve_data_entry
sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
binlog_path = sync_fm.binlog_path
sync_fm.close
`touch #{binlog_path}`
log_info_stdout("Created an empty binlog position file.")
log_info_stdout("-> #{binlog_path}")
log_info_stdout("Run 'flydata start' to start continuous sync.")
end
|
#wait_for_server_buffer ⇒ Object
99
100
101
102
103
104
105
|
# File 'lib/flydata/command/sync.rb', line 99
def wait_for_server_buffer
log_info_stdout("Waiting for the server buffer to get empty.")
while (status = check) && (status['state'] == 'processing')
print_progress(status)
sleep 10
end
end
|
#wait_for_server_data_processing ⇒ Object
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/flydata/command/sync.rb', line 107
def wait_for_server_data_processing
state = :PROCESS
log_info_stdout("Uploading data to Redshift...")
sleep 10
status = nil
while (status = check)
if state == :PROCESS && status['state'] == 'uploading'
log_info_stdout(" -> Done")
state = :UPLOAD
log_info_stdout("Finishing data upload...")
end
print_progress(status)
sleep 10
end
if (state == :PROCESS)
log_info_stdout(" -> Done")
log_info_stdout("Finishing data upload...")
end
log_info_stdout(" -> Done")
end
|