Class: PgSync::Sync

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/pgsync/sync.rb

Instance Method Summary collapse

Methods included from Utils

#colorize, #config_file, #db_config_file, #log, #search_tree

Instance Method Details

#configObject



139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/pgsync/sync.rb', line 139

def config
  @config ||= begin
    if config_file
      begin
        YAML.load_file(config_file) || {}
      rescue Psych::SyntaxError => e
        raise Error, e.message
      end
    else
      {}
    end
  end
end

#confirm_tables_exist(data_source, tables, description) ⇒ Object



90
91
92
93
94
95
96
97
98
# File 'lib/pgsync/sync.rb', line 90

def confirm_tables_exist(data_source, tables, description)
  tables.keys.each do |table|
    unless data_source.table_exists?(table)
      raise Error, "Table does not exist in #{description}: #{table}"
    end
  end
ensure
  data_source.close
end

#deprecated(message) ⇒ Object



226
227
228
# File 'lib/pgsync/sync.rb', line 226

def deprecated(message)
  log colorize("[DEPRECATED] #{message}", 33) # yellow
end

#display_message(result) ⇒ Object



213
214
215
216
217
218
# File 'lib/pgsync/sync.rb', line 213

def display_message(result)
  messages = []
  messages << "- #{result[:time]}s" if result[:time]
  messages << "(#{result[:message].gsub("\n", " ").strip})" if result[:message]
  messages.join(" ")
end

#fail_sync(failed_tables) ⇒ Object

Raises:



209
210
211
# File 'lib/pgsync/sync.rb', line 209

def fail_sync(failed_tables)
  raise Error, "Sync failed for #{failed_tables.size} table#{failed_tables.size == 1 ? nil : "s"}: #{failed_tables.join(", ")}"
end

#in_parallel(tables, first_schema:, &block) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/pgsync/sync.rb', line 158

def in_parallel(tables, first_schema:, &block)
  spinners = TTY::Spinner::Multi.new(format: :dots)
  item_spinners = {}

  start = lambda do |item, i|
    table, opts = item
    message = String.new(":spinner ")
    message << table.sub("#{first_schema}.", "")
    # maybe output later
    # message << " #{opts[:sql]}" if opts[:sql]
    spinner = spinners.register(message)
    spinner.auto_spin
    item_spinners[item] = spinner
  end

  failed_tables = []

  finish = lambda do |item, i, result|
    spinner = item_spinners[item]
    table_name = item.first.sub("#{first_schema}.", "")

    if result[:status] == "success"
      spinner.success(display_message(result))
    else
      # TODO add option to fail fast
      spinner.error(display_message(result))
      failed_tables << table_name
      fail_sync(failed_tables) if @options[:fail_fast]
    end

    unless spinner.send(:tty?)
      status = result[:status] == "success" ? "✔" : "✖"
      log [status, table_name, display_message(result)].compact.join(" ")
    end
  end

  options = {start: start, finish: finish}
  if @options[:debug] || @options[:in_batches]
    options[:in_processes] = 0
  else
    options[:in_threads] = 4 if windows?
  end

  # could try to use `raise Parallel::Kill` to fail faster with --fail-fast
  # see `fast_faster` branch
  # however, need to make sure connections are cleaned up properly
  Parallel.each(tables, **options, &block)

  fail_sync(failed_tables) if failed_tables.any?
end

#log_completed(start_time) ⇒ Object



230
231
232
233
234
# File 'lib/pgsync/sync.rb', line 230

def log_completed(start_time)
  time = Time.now - start_time
  message = "Completed in #{time.round(1)}s"
  log colorize(message, 32) # green
end

#map_deprecations(args, opts) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/pgsync/sync.rb', line 100

def map_deprecations(args, opts)
  command = args[0]

  case command
  when "schema"
    args.shift
    opts[:schema_only] = true
    deprecated "Use `psync --schema-only` instead"
  when "tables"
    args.shift
    opts[:tables] = args.shift
    deprecated "Use `pgsync #{opts[:tables]}` instead"
  when "groups"
    args.shift
    opts[:groups] = args.shift
    deprecated "Use `pgsync #{opts[:groups]}` instead"
  end

  if opts[:where]
    opts[:sql] ||= String.new
    opts[:sql] << " WHERE #{opts[:where]}"
    deprecated "Use `\"WHERE #{opts[:where]}\"` instead"
  end

  if opts[:limit]
    opts[:sql] ||= String.new
    opts[:sql] << " LIMIT #{opts[:limit]}"
    deprecated "Use `\"LIMIT #{opts[:limit]}\"` instead"
  end
end

#perform(options) ⇒ Object

Raises:



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/pgsync/sync.rb', line 5

def perform(options)
  args = options.arguments
  opts = options.to_hash
  @options = opts

  # merge config
  [:to, :from, :to_safe, :exclude, :schemas].each do |opt|
    opts[opt] ||= config[opt.to_s]
  end

  # TODO remove deprecations
  map_deprecations(args, opts)

  # start
  start_time = Time.now

  if args.size > 2
    raise Error, "Usage:\n    pgsync [options]"
  end

  source = DataSource.new(opts[:from])
  raise Error, "No source" unless source.exists?

  destination = DataSource.new(opts[:to])
  raise Error, "No destination" unless destination.exists?

  begin
    # start connections
    source.host
    destination.host

    unless opts[:to_safe] || destination.local?
      raise Error, "Danger! Add `to_safe: true` to `.pgsync.yml` if the destination is not localhost or 127.0.0.1"
    end

    print_description("From", source)
    print_description("To", destination)
  ensure
    source.close
    destination.close
  end

  tables = nil
  begin
    tables = TableList.new(args, opts, source, config).tables
  ensure
    source.close
  end

  confirm_tables_exist(source, tables, "source")

  if opts[:list]
    confirm_tables_exist(destination, tables, "destination")

    list_items =
      if args[0] == "groups"
        (config["groups"] || {}).keys
      else
        tables.keys
      end

    pretty_list list_items
  else
    if opts[:schema_first] || opts[:schema_only]
      if opts[:preserve]
        raise Error, "Cannot use --preserve with --schema-first or --schema-only"
      end

      log "* Dumping schema"
      schema_tables = tables if !opts[:all_schemas] || opts[:tables] || opts[:groups] || args[0] || opts[:exclude]
      sync_schema(source, destination, schema_tables)
    end

    unless opts[:schema_only]
      confirm_tables_exist(destination, tables, "destination")

      in_parallel(tables, first_schema: source.search_path.find { |sp| sp != "pg_catalog" }) do |table, table_opts|
        TableSync.new.sync(config, table, opts.merge(table_opts), source.url, destination.url)
      end
    end

    log_completed(start_time)
  end
end

#pretty_list(items) ⇒ Object



220
221
222
223
224
# File 'lib/pgsync/sync.rb', line 220

def pretty_list(items)
  items.each do |item|
    log item
  end
end


153
154
155
156
# File 'lib/pgsync/sync.rb', line 153

def print_description(prefix, source)
  location = " on #{source.host}:#{source.port}" if source.host
  log "#{prefix}: #{source.dbname}#{location}"
end

#sync_schema(source, destination, tables = nil) ⇒ Object



131
132
133
134
135
136
137
# File 'lib/pgsync/sync.rb', line 131

def sync_schema(source, destination, tables = nil)
  dump_command = source.dump_command(tables)
  restore_command = destination.restore_command
  unless system("#{dump_command} | #{restore_command}")
    raise Error, "Schema sync returned non-zero exit code"
  end
end

#windows?Boolean

Returns:

  • (Boolean)


236
237
238
# File 'lib/pgsync/sync.rb', line 236

def windows?
  Gem.win_platform?
end