Class: Tapsoob::DataStream::FilePartition
- Defined in:
- lib/tapsoob/data_stream/file_partition.rb
Overview
DataStream variant for file-based parallelized loading Each worker reads a different portion of the NDJSON file
Constant Summary
Constants inherited from Base
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
- #complete? ⇒ Boolean
- #fetch_file(dump_path) ⇒ Object
-
#initialize(db, state, opts = {}) ⇒ FilePartition
constructor
A new instance of FilePartition.
Methods inherited from Base
#encode_rows, #error, #error=, factory, #fetch, #fetch_data_from_database, #fetch_data_to_database, #fetch_rows, #import_rows, #increment, #log, #max_chunksize_training, #order_by, #parse_encoded_data, parse_json, #string_columns, #table, #table_name, #table_name_sql, #to_hash, #to_json, #update_chunksize_stats, #verify_stream
Constructor Details
#initialize(db, state, opts = {}) ⇒ FilePartition
Returns a new instance of FilePartition.
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/tapsoob/data_stream/file_partition.rb', line 9 def initialize(db, state, opts = {}) super(db, state, opts) @state = { :line_range => nil, # [start_line, end_line] :lines_read => 0 }.merge(@state) # Initialize current_line from line_range if provided if @state[:line_range] start_line, end_line = @state[:line_range] @state[:current_line] = start_line end end |
Instance Method Details
#complete? ⇒ Boolean
78 79 80 81 82 83 84 |
# File 'lib/tapsoob/data_stream/file_partition.rb', line 78 def complete? return true if state[:line_range].nil? start_line, end_line = state[:line_range] result = state[:current_line] && state[:current_line] > end_line log.debug "DataStream::FilePartition#complete? current_line=#{state[:current_line]} end_line=#{end_line} result=#{result} table=#{table_name}" result end |
#fetch_file(dump_path) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/tapsoob/data_stream/file_partition.rb', line 23 def fetch_file(dump_path) return {} if state[:line_range].nil? file_path = File.join(dump_path, "data", "#{table_name}.json") start_line, end_line = state[:line_range] table_name_val = nil header_val = nil types_val = nil data_batch = [] # Read lines in this worker's range File.open(file_path, 'r') do |file| # Skip to current position state[:current_line].times { file.gets } # Read up to chunksize lines, but don't exceed end_line lines_to_read = [state[:chunksize], end_line - state[:current_line] + 1].min log.debug "DataStream::FilePartition#fetch_file: current_line=#{state[:current_line]} end_line=#{end_line} lines_to_read=#{lines_to_read} chunksize=#{state[:chunksize]} table=#{table_name}" lines_to_read.times do break if file.eof? || state[:current_line] > end_line line = file.gets next unless line chunk = JSON.parse(line.strip) table_name_val ||= chunk["table_name"] header_val ||= chunk["header"] types_val ||= chunk["types"] data_batch.concat(chunk["data"]) if chunk["data"] state[:current_line] += 1 end end log.debug "DataStream::FilePartition#fetch_file: read #{data_batch.size} rows in #{state[:current_line] - start_line} lines table=#{table_name}" # Apply skip-duplicates if needed data_batch = data_batch.uniq if [:"skip-duplicates"] state[:size] = end_line - start_line + 1 state[:offset] = state[:current_line] - start_line rows = { :table_name => table_name_val, :header => header_val, :data => data_batch, :types => types_val } update_chunksize_stats rows end |