Class: Flydata::Command::Sync

Inherits:
Base
  • Object
show all
Includes:
Helpers
Defined in:
lib/flydata/command/sync.rb

Constant Summary collapse

CREATE_TABLE_OPTION =
!!(ENV['FLYDATA_CREATE_TABLE_OPTION']) || false
INSERT_PROGRESS_INTERVAL =
1000
STATUS_PARSING =

for dump.pos file

'PARSING'
STATUS_COMPLETE =
'COMPLETE'

Instance Attribute Summary

Attributes inherited from Base

#opts

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers

development?, env_mode, env_suffix, flydata_api_host_file, flydata_conf_file, format_menu_list, parse_command, retry_on, to_command_class, usage_text

Methods inherited from Base

#ask_input_table_name, #ask_yes_no, #choose_one, #flydata, #initialize, #newline, #register_crontab, #retrieve_data_entries, #separator

Constructor Details

This class inherits a constructor from Flydata::Command::Base

Class Method Details

.slopObject



21
22
23
24
25
# File 'lib/flydata/command/sync.rb', line 21

def self.slop
  Slop.new do
    on 'c', 'skip-cleanup', 'Skip server cleanup'
  end
end

.slop_generate_table_ddlObject



134
135
136
137
138
# File 'lib/flydata/command/sync.rb', line 134

def self.slop_generate_table_ddl
  Slop.new do
    on 'c', 'ctl-only', 'Only generate FlyData Control definitions'
  end
end

.slop_resetObject



50
51
52
53
54
# File 'lib/flydata/command/sync.rb', line 50

def self.slop_reset
  Slop.new do
    on 'c', 'client', 'Resets client only.'
  end
end

Instance Method Details

#checkObject



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/flydata/command/sync.rb', line 111

def check
  de = retrieve_data_entry
  retry_on(RestClient::Exception) do
    status = do_check(de)
    if status['complete']
      nil
    else
      status
    end
  end
end

#flushObject



45
46
47
48
# File 'lib/flydata/command/sync.rb', line 45

def flush
  flush_buffer_and_stop
  puts "Buffers have been flushed and the sender process has been stopped."
end

#generate_table_ddl(*tables) ⇒ Object



140
141
142
143
144
# File 'lib/flydata/command/sync.rb', line 140

def generate_table_ddl(*tables)
  de = retrieve_data_entry
  Flydata::Mysql::CompatibilityCheck.new(de['mysql_data_entry_preference']).check
  do_generate_table_ddl(override_tables(de, tables))
end

#reset(*tables) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/flydata/command/sync.rb', line 56

def reset(*tables)
  msg = tables.empty? ? '' : " for these tables : #{tables.join(" ")}"
  return unless ask_yes_no("This resets the current sync#{msg}.  Are you sure?")
  sender = Flydata::Command::Sender.new
  sender.flush_client_buffer # TODO We should rather delete buffer files
  sender.stop

  de = retrieve_data_entry
  wait_for_server_buffer
  cleanup_sync_server(de, tables) unless opts.client?
  sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
  delete_files = [
    sync_fm.dump_file_path,
    sync_fm.dump_pos_path,
    sync_fm.mysql_table_marshal_dump_path,
    sync_fm.sync_info_file,
    sync_fm.table_position_file_paths(*tables),
    sync_fm.table_rev_file_paths(*tables)
  ]
  delete_files << sync_fm.binlog_path if tables.empty?
  delete_files.flatten.each do |path|
    FileUtils.rm(path) if File.exists?(path)
  end
end

#run(*tables) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/flydata/command/sync.rb', line 27

def run(*tables)
  sender = Flydata::Command::Sender.new
  if (sender.process_exist?)
    if tables.empty?
      # full sync
      puts "FlyData Agent is already running.  If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first."
    else
      # per-table sync
      puts "Flydata Agent is already running.  If you'd like to Sync the table(s), run 'flydata sync:flush' first."
    end
    exit 1
  end
  de = retrieve_data_entry
  de = load_sync_info(override_tables(de, tables))
  flush_buffer_and_stop  unless de['mysql_data_entry_preference']['initial_sync']
  sync_mysql_to_redshift(de)
end

#skipObject

skip initial sync



124
125
126
127
128
129
130
131
132
# File 'lib/flydata/command/sync.rb', line 124

def skip
  de = retrieve_data_entry
  sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
  binlog_path = sync_fm.binlog_path
  `touch #{binlog_path}`
  puts "Created an empty binlog position file."
  puts "-> #{binlog_path}"
  puts "Run 'flydata start' to start continuous sync."
end

#wait_for_server_bufferObject



81
82
83
84
85
86
87
# File 'lib/flydata/command/sync.rb', line 81

def wait_for_server_buffer
  puts "Waiting for the server buffer to get empty"
  while (status = check) && (status['state'] == 'processing')
    print_progress(status)
    sleep 10
  end
end

#wait_for_server_data_processingObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/flydata/command/sync.rb', line 89

def wait_for_server_data_processing
  state = :PROCESS
  puts "Uploading data to Redshift..."
  sleep 10
  status = nil
  while (status = check)
    if state == :PROCESS && status['state'] == 'uploading'
      puts "  -> Done"
      state = :UPLOAD
      puts "Finishing data upload..."
    end
    print_progress(status)
    sleep 10
  end
  if (state == :PROCESS)
    # :UPLOAD state was skipped due to no data
    puts "  -> Done"
    puts "Finishing data upload..."
  end
  puts "  -> Done"
end