Class: Taps::Operation

Inherits:
Object
  • Object
show all
Defined in:
lib/taps/operation.rb

Direct Known Subclasses

Pull, Push

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database_url, remote_url, opts = {}) ⇒ Operation

Returns a new instance of Operation.



20
21
22
23
24
25
26
# File 'lib/taps/operation.rb', line 20

def initialize(database_url, remote_url, opts={})
  @database_url = database_url
  @remote_url = remote_url
  @opts = opts
  @exiting = false
  @session_uri = opts[:session_uri]
end

Instance Attribute Details

#database_urlObject (readonly)

Returns the value of attribute database_url.



17
18
19
# File 'lib/taps/operation.rb', line 17

def database_url
  @database_url
end

#optsObject (readonly)

Returns the value of attribute opts.



17
18
19
# File 'lib/taps/operation.rb', line 17

def opts
  @opts
end

#remote_urlObject (readonly)

Returns the value of attribute remote_url.



17
18
19
# File 'lib/taps/operation.rb', line 17

def remote_url
  @remote_url
end

#session_uriObject (readonly)

Returns the value of attribute session_uri.



18
19
20
# File 'lib/taps/operation.rb', line 18

def session_uri
  @session_uri
end

Class Method Details

.factory(type, database_url, remote_url, opts) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
# File 'lib/taps/operation.rb', line 223

def self.factory(type, database_url, remote_url, opts)
  type = :resume if opts[:resume]
  klass = case type
    when :pull then Taps::Pull
    when :push then Taps::Push
    when :resume then eval(opts[:klass])
    else raise "Unknown Operation Type -> #{type}"
  end

  klass.new(database_url, remote_url, opts)
end

Instance Method Details

#apply_table_filter(tables) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/taps/operation.rb', line 48

def apply_table_filter(tables)
  return tables unless table_filter || exclude_tables

  re = table_filter ? Regexp.new(table_filter) : nil
  if tables.kind_of?(Hash)
    ntables = {}
    tables.each do |t, d|
      if !exclude_tables.include?(t.to_s) && (!re || !re.match(t.to_s).nil?)
        ntables[t] = d
      end
    end
    ntables
  else
    tables.reject { |t| exclude_tables.include?(t.to_s) || (re && re.match(t.to_s).nil?) }
  end
end

#catch_errors(&blk) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/taps/operation.rb', line 199

def catch_errors(&blk)
  verify_server

  begin
    blk.call
    close_session
  rescue RestClient::Exception, Taps::BaseError => e
    store_session
    if e.kind_of?(Taps::BaseError)
      puts "!!! Caught Server Exception"
      puts "#{e.class}: #{e.message}"
      puts "\n#{e.original_backtrace}" if e.original_backtrace
      exit(1)
    elsif e.respond_to?(:response)
      puts "!!! Caught Server Exception"
      puts "HTTP CODE: #{e.http_code}"
      puts "#{e.response.to_s}"
      exit(1)
    else
      raise
    end
  end
end

#close_sessionObject



149
150
151
# File 'lib/taps/operation.rb', line 149

def close_session
  @session_resource.delete(http_headers) if @session_resource
end

#completed_tablesObject



113
114
115
# File 'lib/taps/operation.rb', line 113

def completed_tables
  opts[:completed_tables] ||= []
end

#compression_disabled?Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/taps/operation.rb', line 125

def compression_disabled?
  !!opts[:disable_compression]
end

#dbObject



129
130
131
# File 'lib/taps/operation.rb', line 129

def db
  @db ||= Sequel.connect(database_url)
end

#default_chunksizeObject



109
110
111
# File 'lib/taps/operation.rb', line 109

def default_chunksize
  opts[:default_chunksize]
end

#exclude_tablesObject



44
45
46
# File 'lib/taps/operation.rb', line 44

def exclude_tables
  opts[:exclude_tables] || []
end

#exiting?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/taps/operation.rb', line 89

def exiting?
  !!@exiting
end

#file_prefixObject



28
29
30
# File 'lib/taps/operation.rb', line 28

def file_prefix
  "op"
end

#format_number(num) ⇒ Object



175
176
177
# File 'lib/taps/operation.rb', line 175

def format_number(num)
  num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1,")
end

#http_headers(extra = {}) ⇒ Object



165
166
167
168
169
170
171
172
173
# File 'lib/taps/operation.rb', line 165

def http_headers(extra = {})
  base = { :taps_version => Taps.version }
  if compression_disabled?
    base[:accept_encoding] = ""
  else
    base[:accept_encoding] = "gzip, deflate"
  end
  base.merge(extra)
end

#indexes_first?Boolean

Returns:

  • (Boolean)


36
37
38
# File 'lib/taps/operation.rb', line 36

def indexes_first?
  !!opts[:indexes_first]
end

#logObject



65
66
67
# File 'lib/taps/operation.rb', line 65

def log
  Taps.log
end

#resuming?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/taps/operation.rb', line 105

def resuming?
  opts[:resume] == true
end

#safe_database_urlObject



161
162
163
# File 'lib/taps/operation.rb', line 161

def safe_database_url
  safe_url(database_url)
end

#safe_remote_urlObject



157
158
159
# File 'lib/taps/operation.rb', line 157

def safe_remote_url
  safe_url(remote_url)
end

#safe_url(url) ⇒ Object



153
154
155
# File 'lib/taps/operation.rb', line 153

def safe_url(url)
  url.sub(/\/\/(.+?)?:(.*?)@/, '//\1:[hidden]@')
end

#serverObject



133
134
135
# File 'lib/taps/operation.rb', line 133

def server
  @server ||= RestClient::Resource.new(remote_url)
end

#session_resourceObject



137
138
139
140
141
142
# File 'lib/taps/operation.rb', line 137

def session_resource
  @session_resource ||= begin
    @session_uri ||= server['sessions'].post('', http_headers).to_s
    server[@session_uri]
  end
end

#set_session(uri) ⇒ Object



144
145
146
147
# File 'lib/taps/operation.rb', line 144

def set_session(uri)
  session_uri = uri
  @session_resource = server[session_uri]
end

#setup_signal_trapObject



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/taps/operation.rb', line 93

def setup_signal_trap
  trap("INT") {
    puts "\nCompleting current action..."
    @exiting = true
  }

  trap("TERM") {
    puts "\nCompleting current action..."
    @exiting = true
  }
end

#skip_schema?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/taps/operation.rb', line 32

def skip_schema?
  !!opts[:skip_schema]
end

#store_sessionObject



69
70
71
72
73
74
75
# File 'lib/taps/operation.rb', line 69

def store_session
  file = "#{file_prefix}_#{Time.now.strftime("%Y%m%d%H%M")}.dat"
  puts "\nSaving session to #{file}.."
  File.open(file, 'w') do |f|
    f.write(OkJson.encode(to_hash))
  end
end

#stream_stateObject



117
118
119
# File 'lib/taps/operation.rb', line 117

def stream_state
  opts[:stream_state] ||= {}
end

#stream_state=(val) ⇒ Object



121
122
123
# File 'lib/taps/operation.rb', line 121

def stream_state=(val)
  opts[:stream_state] = val
end

#table_filterObject



40
41
42
# File 'lib/taps/operation.rb', line 40

def table_filter
  opts[:table_filter]
end

#to_hashObject



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/taps/operation.rb', line 77

def to_hash
  {
    :klass => self.class.to_s,
    :database_url => database_url,
    :remote_url => remote_url,
    :session_uri => session_uri,
    :stream_state => stream_state,
    :completed_tables => completed_tables,
    :table_filter => table_filter,
  }
end

#verify_serverObject



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/taps/operation.rb', line 179

def verify_server
  begin
    server['/'].get(http_headers)
  rescue RestClient::RequestFailed => e
    if e.http_code == 417
      puts "#{safe_remote_url} is running a different minor version of taps."
      puts "#{e.response.to_s}"
      exit(1)
    else
      raise
    end
  rescue RestClient::Unauthorized
    puts "Bad credentials given for #{safe_remote_url}"
    exit(1)
  rescue Errno::ECONNREFUSED
    puts "Can't connect to #{safe_remote_url}. Please check that it's running"
    exit(1)
  end
end