Class: Assimilate::Batch
- Inherits:
-
Object
- Object
- Assimilate::Batch
- Defined in:
- lib/assimilate/batch.rb
Constant Summary collapse
- INSERT_BATCH_SIZE =
default batch size for bulk loading into mongo
1000
Instance Attribute Summary collapse
-
#datestamp ⇒ Object
readonly
Returns the value of attribute datestamp.
-
#domain ⇒ Object
readonly
Returns the value of attribute domain.
-
#idfield ⇒ Object
readonly
Returns the value of attribute idfield.
Instance Method Summary collapse
- #<<(record) ⇒ Object
- #apply_deletes ⇒ Object
- #apply_inserts ⇒ Object
- #apply_updates ⇒ Object
-
#commit ⇒ Object
write the updates to the catalog.
- #decorate(records) ⇒ Object
- #deltas(h1, h2) ⇒ Object
-
#initialize(args) ⇒ Batch
constructor
A new instance of Batch.
- #load_baseline ⇒ Object
- #record_batch ⇒ Object
-
#resolve ⇒ Object
compute anything needed before we can write updates to permanent store * find records that have been deleted.
- #stats ⇒ Object
-
#stripped_record_for(key) ⇒ Object
The stripped record contains only the data values from the source (no internal values with leading underscores).
Constructor Details
#initialize(args) ⇒ Batch
Returns a new instance of Batch.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/assimilate/batch.rb', line 4 def initialize(args) @catalog = args[:catalog] @domainkey = @catalog.config[:domain] @domain = args[:domain] @datestamp = args[:datestamp] @idfield = args[:idfield] @filename = args[:filename] load_baseline @noops = [] @changes = {} @adds = [] @deletes = [] @resolved = false end |
Instance Attribute Details
#datestamp ⇒ Object (readonly)
Returns the value of attribute datestamp.
2 3 4 |
# File 'lib/assimilate/batch.rb', line 2 def datestamp @datestamp end |
#domain ⇒ Object (readonly)
Returns the value of attribute domain.
2 3 4 |
# File 'lib/assimilate/batch.rb', line 2 def domain @domain end |
#idfield ⇒ Object (readonly)
Returns the value of attribute idfield.
2 3 4 |
# File 'lib/assimilate/batch.rb', line 2 def idfield @idfield end |
Instance Method Details
#<<(record) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/assimilate/batch.rb', line 40 def <<(record) @seen ||= Hash.new(0) hash = record.to_hash key = hash[@idfield] @seen[key] += 1 current_record = stripped_record_for(key) if current_record if current_record == hash @noops << hash else @changes[key] = deltas(current_record, hash) end else @adds << hash end end |
#apply_deletes ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/assimilate/batch.rb', line 119 def apply_deletes @deleted_keys.each do |key| @catalog.catalog.update( { @domainkey => domain, idfield => key }, {"$set" => {@catalog.config[:deletion_marker] => datestamp}} ) end end |
#apply_inserts ⇒ Object
133 134 135 136 137 138 |
# File 'lib/assimilate/batch.rb', line 133 def apply_inserts @adds.each_slice(INSERT_BATCH_SIZE) do |slice| # mongo insert can't handle CSV::Row objects, must be converted to regular hashes @catalog.catalog.insert(decorate(slice)) end end |
#apply_updates ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/assimilate/batch.rb', line 140 def apply_updates marker = @catalog.config[:update_marker] @changes.each do |key, diffs| @catalog.catalog.update( { @domainkey => domain, idfield => key }, {"$set" => diffs.merge(marker => datestamp)} ) end end |
#commit ⇒ Object
write the updates to the catalog
97 98 99 100 101 102 103 |
# File 'lib/assimilate/batch.rb', line 97 def commit resolve record_batch apply_deletes apply_inserts apply_updates end |
#decorate(records) ⇒ Object
153 154 155 156 157 158 159 160 |
# File 'lib/assimilate/batch.rb', line 153 def decorate(records) marker = @catalog.config[:insertion_marker] records.map do |r| r[@domainkey] = @domain r[marker] = datestamp r.to_hash end end |
#deltas(h1, h2) ⇒ Object
58 59 60 |
# File 'lib/assimilate/batch.rb', line 58 def deltas(h1,h2) (h1.keys | h2.keys).each_with_object({}) {|k,h| h[k] = h2[k] if h1[k] != h2[k]} end |
#load_baseline ⇒ Object
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/assimilate/batch.rb', line 22 def load_baseline stored_records = @catalog.catalog.find(@domainkey => @domain).to_a @baseline = stored_records.each_with_object({}) do |rec, h| key = rec[@idfield] if h.include?(key) raise Assimilate::CorruptDataError, "Duplicate records for key [#{key}] in #{@domainkey} [#{@domain}]" end h[key] = rec end end |
#record_batch ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/assimilate/batch.rb', line 105 def record_batch # don't want leading underscore on attributes in the batches table dkey = @domainkey.gsub(/^_/,'') raise(Assimilate::DuplicateImportError, "duplicate batch for datestamp #{datestamp}") if @catalog.batches.find(dkey => @domain, 'datestamp' => @datestamp).to_a.any? raise(Assimilate::DuplicateImportError, "duplicate batch for file #{@filename}") if @catalog.batches.find(dkey => @domain, 'filename' => @filename).to_a.any? @catalog.batches.insert({ dkey => @domain, 'datestamp' => @datestamp, 'filename' => @filename }) end |
#resolve ⇒ Object
compute anything needed before we can write updates to permanent store
-
find records that have been deleted
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/assimilate/batch.rb', line 64 def resolve if !@resolved @deleted_keys = (@baseline.keys - @seen.keys).reject {|k| @baseline[k][@catalog.config[:deletion_marker]]} @updated_field_counts = @changes.each_with_object(Hash.new(0)) do |(key,diffs),h| # key = rec[idfield] # diffs = deltas(stripped_record_for(key), rec) diffs.keys.each do |f| h[f] += 1 end end @resolved = true end end |
#stats ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/assimilate/batch.rb', line 80 def stats resolve { :baseline_count => @baseline.size, :final_count => @baseline.size + @adds.count, :adds_count => @adds.count, :new_ids => @adds.map {|rec| rec[idfield]}, :deletes_count => @deleted_keys.count, :deleted_ids => @deleted_keys, :updates_count => @changes.size, :updated_ids => @changes.keys, :unchanged_count => @noops.count, :updated_fields => @updated_field_counts } end |
#stripped_record_for(key) ⇒ Object
The stripped record contains only the data values from the source (no internal values with leading underscores). Any nil values are ignored; these should not be stored but if they do appear in the catalog then don’t want to include them when comparing new records vs. old.
36 37 38 |
# File 'lib/assimilate/batch.rb', line 36 def stripped_record_for(key) @baseline[key] && @baseline[key].select {|k,v| k !~ /^_/ && !v.nil?} end |