Class: Flydata::Command::Sync

Inherits:
Base
  • Object
show all
Includes:
Helpers
Defined in:
lib/flydata/command/sync.rb

Constant Summary collapse

CREATE_TABLE_OPTION =
!!(ENV['FLYDATA_CREATE_TABLE_OPTION']) || false
INSERT_PROGRESS_INTERVAL =
1000
STATUS_PARSING =

for dump.pos file

'PARSING'
STATUS_COMPLETE =
'COMPLETE'

Instance Attribute Summary

Attributes inherited from Base

#opts

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers

development?, env_mode, env_suffix, flydata_api_host_file, flydata_conf_file, format_menu_list, parse_command, retry_on, to_command_class, usage_text

Methods inherited from Base

#ask_input_table_name, #ask_yes_no, #choose_one, #flydata, #initialize, #newline, #register_crontab, #retrieve_data_entries, #separator

Constructor Details

This class inherits a constructor from Flydata::Command::Base

Class Method Details

.slopObject



21
22
23
24
25
26
# File 'lib/flydata/command/sync.rb', line 21

def self.slop
  Slop.new do
    on 'c', 'skip-cleanup', 'Skip server cleanup'
    on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
  end
end

.slop_flushObject



46
47
48
49
50
# File 'lib/flydata/command/sync.rb', line 46

def self.slop_flush
  Slop.new do
    on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
  end
end

.slop_generate_table_ddlObject



142
143
144
145
146
147
# File 'lib/flydata/command/sync.rb', line 142

def self.slop_generate_table_ddl
  Slop.new do
    on 'c', 'ctl-only', 'Only generate FlyData Control definitions'
    on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
  end
end

.slop_resetObject



57
58
59
60
61
62
# File 'lib/flydata/command/sync.rb', line 57

def self.slop_reset
  Slop.new do
    on 'c', 'client', 'Resets client only.'
    on 'y', 'yes', 'Skip command prompt assuming yes to all questions.  Use this for batch operation.'
  end
end

Instance Method Details

#checkObject



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/flydata/command/sync.rb', line 119

def check
  de = retrieve_data_entry
  retry_on(RestClient::Exception) do
    status = do_check(de)
    if status['complete']
      nil
    else
      status
    end
  end
end

#flushObject



52
53
54
55
# File 'lib/flydata/command/sync.rb', line 52

def flush
  flush_buffer_and_stop
  puts "Buffers have been flushed and the sender process has been stopped."
end

#generate_table_ddl(*tables) ⇒ Object



149
150
151
152
153
# File 'lib/flydata/command/sync.rb', line 149

def generate_table_ddl(*tables)
  de = retrieve_data_entry
  Flydata::Mysql::CompatibilityCheck.new(de['mysql_data_entry_preference']).check
  do_generate_table_ddl(override_tables(de, tables))
end

#reset(*tables) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/flydata/command/sync.rb', line 64

def reset(*tables)
  msg = tables.empty? ? '' : " for these tables : #{tables.join(" ")}"
  return unless ask_yes_no("This resets the current sync#{msg}.  Are you sure?")
  sender = Flydata::Command::Sender.new
  sender.flush_client_buffer # TODO We should rather delete buffer files
  sender.stop

  de = retrieve_data_entry
  wait_for_server_buffer
  cleanup_sync_server(de, tables) unless opts.client?
  sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
  delete_files = [
    sync_fm.dump_file_path,
    sync_fm.dump_pos_path,
    sync_fm.mysql_table_marshal_dump_path,
    sync_fm.sync_info_file,
    sync_fm.table_position_file_paths(*tables),
    sync_fm.table_rev_file_paths(*tables)
  ]
  delete_files << sync_fm.binlog_path if tables.empty?
  delete_files.flatten.each do |path|
    FileUtils.rm(path) if File.exists?(path)
  end
end

#run(*tables) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/flydata/command/sync.rb', line 28

def run(*tables)
  sender = Flydata::Command::Sender.new
  if (sender.process_exist?)
    if tables.empty?
      # full sync
      puts "FlyData Agent is already running.  If you'd like to restart FlyData Sync from scratch, run 'flydata sync:reset' first."
    else
      # per-table sync
      puts "Flydata Agent is already running.  If you'd like to Sync the table(s), run 'flydata sync:flush' first."
    end
    exit 1
  end
  de = retrieve_data_entry
  de = load_sync_info(override_tables(de, tables))
  flush_buffer_and_stop  unless de['mysql_data_entry_preference']['initial_sync']
  sync_mysql_to_redshift(de)
end

#skipObject

skip initial sync



132
133
134
135
136
137
138
139
140
# File 'lib/flydata/command/sync.rb', line 132

def skip
  de = retrieve_data_entry
  sync_fm = Flydata::FileUtil::SyncFileManager.new(de)
  binlog_path = sync_fm.binlog_path
  `touch #{binlog_path}`
  puts "Created an empty binlog position file."
  puts "-> #{binlog_path}"
  puts "Run 'flydata start' to start continuous sync."
end

#wait_for_server_bufferObject



89
90
91
92
93
94
95
# File 'lib/flydata/command/sync.rb', line 89

def wait_for_server_buffer
  puts "Waiting for the server buffer to get empty"
  while (status = check) && (status['state'] == 'processing')
    print_progress(status)
    sleep 10
  end
end

#wait_for_server_data_processingObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/flydata/command/sync.rb', line 97

def wait_for_server_data_processing
  state = :PROCESS
  puts "Uploading data to Redshift..."
  sleep 10
  status = nil
  while (status = check)
    if state == :PROCESS && status['state'] == 'uploading'
      puts "  -> Done"
      state = :UPLOAD
      puts "Finishing data upload..."
    end
    print_progress(status)
    sleep 10
  end
  if (state == :PROCESS)
    # :UPLOAD state was skipped due to no data
    puts "  -> Done"
    puts "Finishing data upload..."
  end
  puts "  -> Done"
end