Class: Flydata::Command::Sync

Inherits:
Base
  • Object
show all
Includes:
Helpers
Defined in:
lib/flydata/command/sync.rb

Constant Summary collapse

INSERT_PROGRESS_INTERVAL =
1000
STATUS_PARSING =

for dump.pos file

'PARSING'
STATUS_WAITING =
'WAITING'
STATUS_COMPLETE =
'COMPLETE'

Constants included from Helpers

Helpers::UNIT_PREFIX

Instance Attribute Summary

Attributes inherited from Base

#opts

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers

as_size, development?, env_mode, env_suffix, flydata_api_host_file, flydata_conf_file, flydata_version, format_menu_list, parse_command, retry_on, to_command_class, usage_text

Methods inherited from Base

#ask_input_table_name, #ask_yes_no, #choose_one, #flydata, #initialize, #newline, #register_crontab, #retrieve_data_entries, #separator

Methods included from Flydata::CommandLoggable

#before_logging, #log_error_stderr, #log_info_stdout, #log_warn_stderr

Constructor Details

This class inherits a constructor from Flydata::Command::Base

Class Method Details

.slopObject



24
25
26
27
28
29
30
31
32
# File 'lib/flydata/command/sync.rb', line 24

def self.slop
  Slop.new do
    on 'c', 'skip-cleanup', 'Skip server cleanup'
    on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
    on 'd', 'dump-file', 'Dump mysqldump into a file. Use this for debugging after making sure the free space.' # dummy for compatibility
    on 's', 'dump-stream', 'Dump mysqldump stream instead of saving dump file. It might cause timeout error if db size is larger than 10GB.'
    on 'n', 'no-flydata-start', 'Don\'t start the flydata agent after initial sync.'
  end
end

.slop_flushObject



54
55
56
57
58
# File 'lib/flydata/command/sync.rb', line 54

def self.slop_flush
  Slop.new do
    on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
  end
end

.slop_generate_table_ddlObject



153
154
155
156
157
158
159
# File 'lib/flydata/command/sync.rb', line 153

def self.slop_generate_table_ddl
  Slop.new do
    on 'c', 'ctl-only', 'Only generate FlyData Control definitions'
    on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
    on 's', 'skip-primary-key-check', 'Skip primary key check when generating DDL'
  end
end

.slop_resetObject



65
66
67
68
69
70
# File 'lib/flydata/command/sync.rb', line 65

def self.slop_reset
  Slop.new do
    on 'c', 'client', 'Resets client only.'
    on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
  end
end

Instance Method Details

#checkObject



129
130
131
132
133
134
135
136
137
138
139
# File 'lib/flydata/command/sync.rb', line 129

def check
  de = retrieve_data_entry
  retry_on(RestClient::Exception) do
    status = do_check(de)
    if status['complete']
      nil
    else
      status
    end
  end
end

#flushObject



60
61
62
63
# File 'lib/flydata/command/sync.rb', line 60

def flush
  flush_buffer_and_stop
  log_info_stdout("Buffers have been flushed and the sender process has been stopped.")
end

#generate_table_ddl(*tables) ⇒ Object



161
162
163
164
165
166
# File 'lib/flydata/command/sync.rb', line 161

def generate_table_ddl(*tables)
  de = retrieve_data_entry
  dp = flydata.data_port.get
  Flydata::MysqlCompatibilityCheck.new(dp, de['mysql_data_entry_preference']).check
  do_generate_table_ddl(override_tables(de, tables))
end

#reset(*tables) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/flydata/command/sync.rb', line 72

def reset(*tables)
  msg = tables.empty? ? '' : " for these tables : #{tables.join(" ")}"
  return unless ask_yes_no("This resets the current sync#{msg}.  Are you sure?")
  sender = Flydata::Command::Sender.new
  sender.flush_client_buffer # TODO We should rather delete buffer files
  sender.stop

  de = retrieve_data_entry
  wait_for_server_buffer
  cleanup_sync_server(de, tables) unless opts.client?
  sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
  delete_files = [
    sync_fm.dump_file_path,
    sync_fm.dump_pos_path,
    sync_fm.mysql_table_marshal_dump_path,
    sync_fm.sync_info_file,
    sync_fm.table_position_file_paths(*tables),
    sync_fm.table_rev_file_paths(*tables)
  ]
  delete_files << sync_fm.binlog_path if tables.empty?
  delete_files.flatten.each do |path|
    FileUtils.rm(path) if File.exists?(path)
  end
  sync_fm.close
  log_info_stdout("Reset completed successfully.")
end

#run(*tables) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/flydata/command/sync.rb', line 34

def run(*tables)
  sender = Flydata::Command::Sender.new
  if (sender.process_exist?)
    if tables.empty?
      # full sync
      log_warn_stderr("FlyData Agent is already running.  If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first.")
    else
      # per-table sync
      log_warn_stderr("Flydata Agent is already running.  If you'd like to Sync the table(s), run 'flydata sync:flush' first.")
    end
    exit 1
  end

  de = retrieve_data_entry
  de = load_sync_info(override_tables(de, tables))
  validate_initial_sync_status(de, tables)
  flush_buffer_and_stop  unless de['mysql_data_entry_preference']['initial_sync']
  sync_mysql_to_redshift(de)
end

#skipObject

skip initial sync



142
143
144
145
146
147
148
149
150
151
# File 'lib/flydata/command/sync.rb', line 142

def skip
  de = retrieve_data_entry
  sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
  binlog_path = sync_fm.binlog_path
  sync_fm.close
  `touch #{binlog_path}`
  log_info_stdout("Created an empty binlog position file.")
  log_info_stdout("-> #{binlog_path}")
  log_info_stdout("Run 'flydata start' to start continuous sync.")
end

#wait_for_server_bufferObject



99
100
101
102
103
104
105
# File 'lib/flydata/command/sync.rb', line 99

def wait_for_server_buffer
  log_info_stdout("Waiting for the server buffer to get empty.")
  while (status = check) && (status['state'] == 'processing')
    print_progress(status)
    sleep 10
  end
end

#wait_for_server_data_processingObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/flydata/command/sync.rb', line 107

def wait_for_server_data_processing
  state = :PROCESS
  log_info_stdout("Uploading data to Redshift...")
  sleep 10
  status = nil
  while (status = check)
    if state == :PROCESS && status['state'] == 'uploading'
      log_info_stdout("  -> Done")
      state = :UPLOAD
      log_info_stdout("Finishing data upload...")
    end
    print_progress(status)
    sleep 10
  end
  if (state == :PROCESS)
    # :UPLOAD state was skipped due to no data
    log_info_stdout("  -> Done")
    log_info_stdout("Finishing data upload...")
  end
  log_info_stdout("  -> Done")
end