Class: Assimilate::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/assimilate/batch.rb

Constant Summary collapse

INSERT_BATCH_SIZE =

default batch size for bulk loading into mongo

1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Batch

Returns a new instance of Batch.



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/assimilate/batch.rb', line 4

def initialize(args)
  @catalog = args[:catalog]
  @domainkey = @catalog.config[:domain]

  @domain = args[:domain]
  @datestamp = args[:datestamp]
  @idfield = args[:idfield]
  @filename = args[:filename]

  load_baseline

  @noops = []
  @changes = {}
  @adds = []
  @deletes = []
  @resolved = false
end

Instance Attribute Details

#datestampObject (readonly)

Returns the value of attribute datestamp.



2
3
4
# File 'lib/assimilate/batch.rb', line 2

def datestamp
  @datestamp
end

#domainObject (readonly)

Returns the value of attribute domain.



2
3
4
# File 'lib/assimilate/batch.rb', line 2

def domain
  @domain
end

#idfieldObject (readonly)

Returns the value of attribute idfield.



2
3
4
# File 'lib/assimilate/batch.rb', line 2

def idfield
  @idfield
end

Instance Method Details

#<<(record) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/assimilate/batch.rb', line 40

def <<(record)
  @seen ||= Hash.new(0)

  hash = record.to_hash
  key = hash[@idfield]
  @seen[key] += 1
  current_record = stripped_record_for(key)
  if current_record
    if current_record == hash
      @noops << hash
    else
      @changes[key] = deltas(current_record, hash)
    end
  else
    @adds << hash
  end
end

#apply_deletesObject



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/assimilate/batch.rb', line 119

def apply_deletes
  @deleted_keys.each do |key|
    @catalog.catalog.update(
      {
        @domainkey => domain,
        idfield => key
      },
      {"$set" => {@catalog.config[:deletion_marker] => datestamp}}
  )
  end
end

#apply_insertsObject



133
134
135
136
137
138
# File 'lib/assimilate/batch.rb', line 133

def apply_inserts
  @adds.each_slice(INSERT_BATCH_SIZE) do |slice|
    # mongo insert can't handle CSV::Row objects, must be converted to regular hashes
    @catalog.catalog.insert(decorate(slice))
  end
end

#apply_updatesObject



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/assimilate/batch.rb', line 140

def apply_updates
  marker = @catalog.config[:update_marker]
  @changes.each do |key, diffs|
    @catalog.catalog.update(
      {
        @domainkey => domain,
        idfield => key
      },
      {"$set" => diffs.merge(marker => datestamp)}
    )
  end
end

#commitObject

write the updates to the catalog



97
98
99
100
101
102
103
# File 'lib/assimilate/batch.rb', line 97

def commit
  resolve
  record_batch
  apply_deletes
  apply_inserts
  apply_updates
end

#decorate(records) ⇒ Object



153
154
155
156
157
158
159
160
# File 'lib/assimilate/batch.rb', line 153

def decorate(records)
  marker = @catalog.config[:insertion_marker]
  records.map do |r|
    r[@domainkey] = @domain
    r[marker] = datestamp
    r.to_hash
  end
end

#deltas(h1, h2) ⇒ Object



58
59
60
# File 'lib/assimilate/batch.rb', line 58

def deltas(h1,h2)
  (h1.keys | h2.keys).each_with_object({}) {|k,h| h[k] = h2[k] if h1[k] != h2[k]}
end

#load_baselineObject



22
23
24
25
26
27
28
29
30
31
# File 'lib/assimilate/batch.rb', line 22

def load_baseline
  stored_records = @catalog.catalog.find(@domainkey => @domain).to_a
  @baseline = stored_records.each_with_object({}) do |rec, h|
    key = rec[@idfield]
    if h.include?(key)
      raise Assimilate::CorruptDataError, "Duplicate records for key [#{key}] in #{@domainkey} [#{@domain}]"
    end
    h[key] = rec
  end
end

#record_batchObject



105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/assimilate/batch.rb', line 105

def record_batch
  # don't want leading underscore on attributes in the batches table
  dkey = @domainkey.gsub(/^_/,'')

  raise(Assimilate::DuplicateImportError, "duplicate batch for datestamp #{datestamp}") if @catalog.batches.find(dkey => @domain, 'datestamp' => @datestamp).to_a.any?
  raise(Assimilate::DuplicateImportError, "duplicate batch for file #{@filename}") if @catalog.batches.find(dkey => @domain, 'filename' => @filename).to_a.any?

  @catalog.batches.insert({
    dkey => @domain,
    'datestamp' => @datestamp,
    'filename' => @filename
    })
end

#resolveObject

compute anything needed before we can write updates to permanent store

  • find records that have been deleted



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/assimilate/batch.rb', line 64

def resolve
  if !@resolved
    @deleted_keys = (@baseline.keys - @seen.keys).reject {|k| @baseline[k][@catalog.config[:deletion_marker]]}

    @updated_field_counts = @changes.each_with_object(Hash.new(0)) do |(key,diffs),h|
      # key = rec[idfield]
      # diffs = deltas(stripped_record_for(key), rec)
      diffs.keys.each do |f|
        h[f] += 1
      end
    end

    @resolved = true
  end
end

#statsObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/assimilate/batch.rb', line 80

def stats
  resolve
  {
    :baseline_count => @baseline.size,
    :final_count => @baseline.size + @adds.count,
    :adds_count => @adds.count,
    :new_ids => @adds.map {|rec| rec[idfield]},
    :deletes_count => @deleted_keys.count,
    :deleted_ids => @deleted_keys,
    :updates_count => @changes.size,
    :updated_ids => @changes.keys,
    :unchanged_count => @noops.count,
    :updated_fields => @updated_field_counts
  }
end

#stripped_record_for(key) ⇒ Object

The stripped record contains only the data values from the source (no internal values with leading underscores). Any nil values are ignored; these should not be stored but if they do appear in the catalog then don’t want to include them when comparing new records vs. old.



36
37
38
# File 'lib/assimilate/batch.rb', line 36

def stripped_record_for(key)
  @baseline[key] && @baseline[key].select {|k,v| k !~ /^_/ && !v.nil?}
end