Class: Taps::Operation

Inherits:
Object
  • Object
show all
Defined in:
lib/taps/operation.rb

Direct Known Subclasses

Pull, Push

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database_url, remote_url, opts = {}) ⇒ Operation

Returns a new instance of Operation.



19
20
21
22
23
24
25
# File 'lib/taps/operation.rb', line 19

def initialize(database_url, remote_url, opts={})
	@database_url = database_url
	@remote_url = remote_url
	@opts = opts
	@exiting = false
	@session_uri = opts[:session_uri]
end

Instance Attribute Details

#database_urlObject (readonly)

Returns the value of attribute database_url.



16
17
18
# File 'lib/taps/operation.rb', line 16

def database_url
  @database_url
end

#optsObject (readonly)

Returns the value of attribute opts.



16
17
18
# File 'lib/taps/operation.rb', line 16

def opts
  @opts
end

#remote_urlObject (readonly)

Returns the value of attribute remote_url.



16
17
18
# File 'lib/taps/operation.rb', line 16

def remote_url
  @remote_url
end

#session_uriObject (readonly)

Returns the value of attribute session_uri.



17
18
19
# File 'lib/taps/operation.rb', line 17

def session_uri
  @session_uri
end

Class Method Details

.factory(type, database_url, remote_url, opts) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
# File 'lib/taps/operation.rb', line 189

def self.factory(type, database_url, remote_url, opts)
	type = :resume if opts[:resume]
	klass = case type
		when :pull then Taps::Pull
		when :push then Taps::Push
		when :resume then eval(opts[:klass])
		else raise "Unknown Operation Type -> #{type}"
	end

	klass.new(database_url, remote_url, opts)
end

Instance Method Details

#apply_table_filter(tables) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/taps/operation.rb', line 39

def apply_table_filter(tables)
	return tables unless table_filter
	re = Regexp.new(table_filter)
	if tables.kind_of?(Hash)
		ntables = {}
		tables.each do |t, d|
			unless re.match(t.to_s).nil?
				ntables[t] = d
			end
		end
		ntables
	else
		tables.reject { |t| re.match(t.to_s).nil? }
	end
end

#close_sessionObject



139
140
141
# File 'lib/taps/operation.rb', line 139

def close_session
	@session_resource.delete(http_headers) if @session_resource
end

#completed_tablesObject



103
104
105
# File 'lib/taps/operation.rb', line 103

def completed_tables
	opts[:completed_tables] ||= []
end

#compression_disabled?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/taps/operation.rb', line 115

def compression_disabled?
	!!opts[:disable_compression]
end

#dbObject



119
120
121
# File 'lib/taps/operation.rb', line 119

def db
	@db ||= Sequel.connect(database_url)
end

#default_chunksizeObject



99
100
101
# File 'lib/taps/operation.rb', line 99

def default_chunksize
	opts[:default_chunksize]
end

#exiting?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/taps/operation.rb', line 79

def exiting?
	!!@exiting
end

#file_prefixObject



27
28
29
# File 'lib/taps/operation.rb', line 27

def file_prefix
	"op"
end

#format_number(num) ⇒ Object



165
166
167
# File 'lib/taps/operation.rb', line 165

def format_number(num)
	num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1,")
end

#http_headers(extra = {}) ⇒ Object



155
156
157
158
159
160
161
162
163
# File 'lib/taps/operation.rb', line 155

def http_headers(extra = {})
	base = { :taps_version => Taps.version }
	if compression_disabled?
		base[:accept_encoding] = ""
	else
		base[:accept_encoding] = "gzip, deflate"
	end
	base.merge(extra)
end

#indexes_first?Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/taps/operation.rb', line 31

def indexes_first?
	!!opts[:indexes_first]
end

#logObject



55
56
57
# File 'lib/taps/operation.rb', line 55

def log
	Taps.log
end

#resuming?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/taps/operation.rb', line 95

def resuming?
	opts[:resume] == true
end

#safe_database_urlObject



151
152
153
# File 'lib/taps/operation.rb', line 151

def safe_database_url
	safe_url(database_url)
end

#safe_remote_urlObject



147
148
149
# File 'lib/taps/operation.rb', line 147

def safe_remote_url
	safe_url(remote_url)
end

#safe_url(url) ⇒ Object



143
144
145
# File 'lib/taps/operation.rb', line 143

def safe_url(url)
	url.sub(/\/\/(.+?)?:(.*?)@/, '//\1:[hidden]@')
end

#serverObject



123
124
125
# File 'lib/taps/operation.rb', line 123

def server
	@server ||= RestClient::Resource.new(remote_url)
end

#session_resourceObject



127
128
129
130
131
132
# File 'lib/taps/operation.rb', line 127

def session_resource
	@session_resource ||= begin
		@session_uri ||= server['sessions'].post('', http_headers).to_s
		server[@session_uri]
	end
end

#set_session(uri) ⇒ Object



134
135
136
137
# File 'lib/taps/operation.rb', line 134

def set_session(uri)
	session_uri = uri
	@session_resource = server[session_uri]
end

#setup_signal_trapObject



83
84
85
86
87
88
89
90
91
92
93
# File 'lib/taps/operation.rb', line 83

def setup_signal_trap
	trap("INT") {
		puts "\nCompleting current action..."
		@exiting = true
	}

	trap("TERM") {
		puts "\nCompleting current action..."
		@exiting = true
	}
end

#store_sessionObject



59
60
61
62
63
64
65
# File 'lib/taps/operation.rb', line 59

def store_session
	file = "#{file_prefix}_#{Time.now.strftime("%Y%m%d%H%M")}.dat"
	puts "\nSaving session to #{file}.."
	File.open(file, 'w') do |f|
		f.write(to_hash.to_json)
	end
end

#stream_stateObject



107
108
109
# File 'lib/taps/operation.rb', line 107

def stream_state
	opts[:stream_state] ||= {}
end

#stream_state=(val) ⇒ Object



111
112
113
# File 'lib/taps/operation.rb', line 111

def stream_state=(val)
	opts[:stream_state] = val
end

#table_filterObject



35
36
37
# File 'lib/taps/operation.rb', line 35

def table_filter
	opts[:table_filter]
end

#to_hashObject



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/taps/operation.rb', line 67

def to_hash
	{
		:klass => self.class.to_s,
		:database_url => database_url,
		:remote_url => remote_url,
		:session_uri => session_uri,
		:stream_state => stream_state,
		:completed_tables => completed_tables,
		:table_filter => table_filter,
	}
end

#verify_serverObject



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/taps/operation.rb', line 169

def verify_server
	begin
		server['/'].get(http_headers)
	rescue RestClient::RequestFailed => e
		if e.http_code == 417
			puts "#{safe_remote_url} is running a different minor version of taps."
			puts "#{e.response.to_s}"
			exit(1)
		else
			raise
		end
	rescue RestClient::Unauthorized
		puts "Bad credentials given for #{safe_remote_url}"
		exit(1)
	rescue Errno::ECONNREFUSED
		puts "Can't connect to #{safe_remote_url}. Please check that it's running"
		exit(1)
	end
end