Class: Tapsoob::Operation::Base
- Inherits:
-
Object
- Object
- Tapsoob::Operation::Base
- Defined in:
- lib/tapsoob/operation/base.rb
Instance Attribute Summary collapse
-
#database_url ⇒ Object
readonly
Returns the value of attribute database_url.
-
#dump_path ⇒ Object
readonly
Returns the value of attribute dump_path.
-
#opts ⇒ Object
readonly
Returns the value of attribute opts.
Class Method Summary collapse
Instance Method Summary collapse
- #add_completed_table(table_name) ⇒ Object
- #apply_table_filter(tables) ⇒ Object
-
#can_use_pk_partitioning?(table_name) ⇒ Boolean
Check if table can use efficient PK-based partitioning.
- #catch_errors(&blk) ⇒ Object
- #completed_tables ⇒ Object
- #completed_tables_mutex ⇒ Object
- #data? ⇒ Boolean
- #db ⇒ Object
- #default_chunksize ⇒ Object
- #exclude_tables ⇒ Object
- #exiting? ⇒ Boolean
- #file_prefix ⇒ Object
- #format_number(num) ⇒ Object
- #indexes_first? ⇒ Boolean
-
#initialize(database_url, dump_path = nil, opts = {}) ⇒ Base
constructor
A new instance of Base.
- #load_table_order ⇒ Object
- #log ⇒ Object
- #parallel? ⇒ Boolean
- #parallel_workers ⇒ Object
- #resuming? ⇒ Boolean
- #save_table_order(table_names) ⇒ Object
- #schema? ⇒ Boolean
- #setup_signal_trap ⇒ Object
- #store_session ⇒ Object
- #stream_state ⇒ Object
- #stream_state=(val) ⇒ Object
- #table_filter ⇒ Object
-
#table_parallel_workers(table_name, row_count) ⇒ Object
Auto-detect number of workers for intra-table parallelization.
- #to_hash ⇒ Object
Constructor Details
#initialize(database_url, dump_path = nil, opts = {}) ⇒ Base
Returns a new instance of Base.
17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/tapsoob/operation/base.rb', line 17 def initialize(database_url, dump_path = nil, opts={}) @database_url = database_url @dump_path = dump_path @opts = opts @exiting = false # Enable JSON progress events only when: # 1. CLI progress bars are disabled (--progress=false), AND # 2. Not piping (dump_path is provided) # This prevents STDERR noise when piping and when using visual progress bars Tapsoob::ProgressEvent.enabled = !opts[:progress] && !dump_path.nil? end |
Instance Attribute Details
#database_url ⇒ Object (readonly)
Returns the value of attribute database_url.
15 16 17 |
# File 'lib/tapsoob/operation/base.rb', line 15 def database_url @database_url end |
#dump_path ⇒ Object (readonly)
Returns the value of attribute dump_path.
15 16 17 |
# File 'lib/tapsoob/operation/base.rb', line 15 def dump_path @dump_path end |
#opts ⇒ Object (readonly)
Returns the value of attribute opts.
15 16 17 |
# File 'lib/tapsoob/operation/base.rb', line 15 def opts @opts end |
Class Method Details
.factory(type, database_url, dump_path, opts) ⇒ Object
227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/tapsoob/operation/base.rb', line 227 def self.factory(type, database_url, dump_path, opts) type = :resume if opts[:resume] klass = case type when :pull then Tapsoob::Operation::Pull when :push then Tapsoob::Operation::Push when :resume then eval(opts[:klass]) else raise "Unknown Operation Type -> #{type}" end klass.new(database_url, dump_path, opts) end |
Instance Method Details
#add_completed_table(table_name) ⇒ Object
191 192 193 194 195 |
# File 'lib/tapsoob/operation/base.rb', line 191 def add_completed_table(table_name) completed_tables_mutex.synchronize do completed_tables << table_name.to_s end end |
#apply_table_filter(tables) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/tapsoob/operation/base.rb', line 54 def apply_table_filter(tables) return tables if table_filter.empty? && exclude_tables.empty? if tables.kind_of?(Hash) ntables = {} tables.each do |t, d| if !exclude_tables.include?(t.to_s) && (!table_filter.empty? && table_filter.include?(t.to_s)) ntables[t] = d end end ntables else tables.reject { |t| exclude_tables.include?(t.to_s) }.select { |t| table_filter.include?(t.to_s) } end end |
#can_use_pk_partitioning?(table_name) ⇒ Boolean
Check if table can use efficient PK-based partitioning
183 184 185 |
# File 'lib/tapsoob/operation/base.rb', line 183 def can_use_pk_partitioning?(table_name) Tapsoob::Utils.single_integer_primary_key(db, table_name.to_sym) end |
#catch_errors(&blk) ⇒ Object
219 220 221 222 223 224 225 |
# File 'lib/tapsoob/operation/base.rb', line 219 def catch_errors(&blk) begin blk.call rescue Exception => e raise e end end |
#completed_tables ⇒ Object
117 118 119 |
# File 'lib/tapsoob/operation/base.rb', line 117 def completed_tables opts[:completed_tables] ||= [] end |
#completed_tables_mutex ⇒ Object
187 188 189 |
# File 'lib/tapsoob/operation/base.rb', line 187 def completed_tables_mutex @completed_tables_mutex ||= Mutex.new end |
#data? ⇒ Boolean
34 35 36 |
# File 'lib/tapsoob/operation/base.rb', line 34 def data? opts[:data] end |
#db ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/tapsoob/operation/base.rb', line 129 def db @db ||= Sequel.connect(database_url, max_connections: parallel_workers * 2) @db.extension :schema_dumper @db.loggers << Tapsoob.log if opts[:debug] # Set parameters if @db.uri =~ /oracle/i @db << "ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'" @db << "ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS:FF6'" end @db end |
#default_chunksize ⇒ Object
113 114 115 |
# File 'lib/tapsoob/operation/base.rb', line 113 def default_chunksize opts[:default_chunksize] end |
#exclude_tables ⇒ Object
50 51 52 |
# File 'lib/tapsoob/operation/base.rb', line 50 def exclude_tables opts[:exclude_tables] || [] end |
#exiting? ⇒ Boolean
93 94 95 |
# File 'lib/tapsoob/operation/base.rb', line 93 def exiting? !!@exiting end |
#file_prefix ⇒ Object
30 31 32 |
# File 'lib/tapsoob/operation/base.rb', line 30 def file_prefix "op" end |
#format_number(num) ⇒ Object
197 198 199 |
# File 'lib/tapsoob/operation/base.rb', line 197 def format_number(num) num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1,") end |
#indexes_first? ⇒ Boolean
42 43 44 |
# File 'lib/tapsoob/operation/base.rb', line 42 def indexes_first? !!opts[:indexes_first] end |
#load_table_order ⇒ Object
210 211 212 213 214 215 216 217 |
# File 'lib/tapsoob/operation/base.rb', line 210 def load_table_order return nil unless dump_path = File.join(dump_path, "table_order.txt") return nil unless File.exist?() File.readlines().map(&:strip).reject(&:empty?) end |
#log ⇒ Object
70 71 72 73 |
# File 'lib/tapsoob/operation/base.rb', line 70 def log Tapsoob.log.level = Logger::DEBUG if opts[:debug] Tapsoob.log end |
#parallel? ⇒ Boolean
143 144 145 |
# File 'lib/tapsoob/operation/base.rb', line 143 def parallel? parallel_workers > 1 end |
#parallel_workers ⇒ Object
147 148 149 |
# File 'lib/tapsoob/operation/base.rb', line 147 def parallel_workers @parallel_workers ||= [opts[:parallel].to_i, 1].max end |
#resuming? ⇒ Boolean
109 110 111 |
# File 'lib/tapsoob/operation/base.rb', line 109 def resuming? opts[:resume] == true end |
#save_table_order(table_names) ⇒ Object
201 202 203 204 205 206 207 208 |
# File 'lib/tapsoob/operation/base.rb', line 201 def save_table_order(table_names) return unless dump_path = File.join(dump_path, "table_order.txt") File.open(, 'w') do |file| table_names.each { |table| file.puts(table) } end end |
#schema? ⇒ Boolean
38 39 40 |
# File 'lib/tapsoob/operation/base.rb', line 38 def schema? opts[:schema] end |
#setup_signal_trap ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/tapsoob/operation/base.rb', line 97 def setup_signal_trap trap("INT") { puts "\nCompleting current action..." @exiting = true } trap("TERM") { puts "\nCompleting current action..." @exiting = true } end |
#store_session ⇒ Object
75 76 77 78 79 80 81 |
# File 'lib/tapsoob/operation/base.rb', line 75 def store_session file = "#{file_prefix}_#{Time.now.strftime("%Y%m%d%H%M")}.dat" log.info "\nSaving session to #{file}..." File.open(file, 'w') do |f| f.write(JSON.generate(to_hash)) end end |
#stream_state ⇒ Object
121 122 123 |
# File 'lib/tapsoob/operation/base.rb', line 121 def stream_state opts[:stream_state] ||= {} end |
#stream_state=(val) ⇒ Object
125 126 127 |
# File 'lib/tapsoob/operation/base.rb', line 125 def stream_state=(val) opts[:stream_state] = val end |
#table_filter ⇒ Object
46 47 48 |
# File 'lib/tapsoob/operation/base.rb', line 46 def table_filter opts[:tables] || [] end |
#table_parallel_workers(table_name, row_count) ⇒ Object
Auto-detect number of workers for intra-table parallelization
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/tapsoob/operation/base.rb', line 152 def table_parallel_workers(table_name, row_count) # Disable intra-table parallelization when piping to STDOUT # (no dump_path means we're outputting JSON directly, which can't be safely parallelized) return 1 if dump_path.nil? # TEMPORARILY RE-ENABLED for debugging # return 1 if self.is_a?(Tapsoob::Operation::Push) # Minimum threshold for parallelization (100K rows by default) threshold = 100_000 return 1 if row_count < threshold # Detect available CPU cores available_cpus = Etc.nprocessors rescue 4 # Use up to 50% of CPUs for single table, max 8 workers max_workers = [available_cpus / 2, 8, 2].max # Scale based on table size if row_count >= 5_000_000 max_workers elsif row_count >= 1_000_000 [max_workers / 2, 2].max elsif row_count >= 500_000 [max_workers / 4, 2].max else 2 # Minimum 2 workers for tables over threshold end end |
#to_hash ⇒ Object
83 84 85 86 87 88 89 90 91 |
# File 'lib/tapsoob/operation/base.rb', line 83 def to_hash { :klass => self.class.to_s, :database_url => database_url, :stream_state => stream_state, :completed_tables => completed_tables, :table_filter => table_filter, } end |