Class: Assimilate::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/assimilate/batch.rb

Constant Summary collapse

INSERT_BATCH_SIZE =

default batch size for bulk loading into mongo

1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Batch

Returns a new instance of Batch.



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/assimilate/batch.rb', line 4

def initialize(args)
  @catalog = args[:catalog]
  @domainkey = @catalog.config[:domain]

  @domain = args[:domain]
  @datestamp = args[:datestamp]
  @idfield = args[:idfield]
  @filename = args[:filename]

  load_baseline

  @noops = []
  @changes = []
  @adds = []
  @deletes = []
  @resolved = false
end

Instance Attribute Details

#datestampObject (readonly)

Returns the value of attribute datestamp.



2
3
4
# File 'lib/assimilate/batch.rb', line 2

def datestamp
  @datestamp
end

#domainObject (readonly)

Returns the value of attribute domain.



2
3
4
# File 'lib/assimilate/batch.rb', line 2

def domain
  @domain
end

#idfieldObject (readonly)

Returns the value of attribute idfield.



2
3
4
# File 'lib/assimilate/batch.rb', line 2

def idfield
  @idfield
end

Instance Method Details

#<<(record) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/assimilate/batch.rb', line 37

def <<(record)
  @seen ||= Hash.new(0)

  hash = record.to_hash
  key = hash[@idfield]
  @seen[key] += 1
  current_record = stripped_record_for(key)
  if current_record
    if current_record == hash
      @noops << hash
    else
      @changes << hash
    end
  else
    @adds << hash
  end
end

#apply_deletesObject



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/assimilate/batch.rb', line 108

def apply_deletes
  @deleted_keys.each do |key|
    @catalog.catalog.update(
      {
        @domainkey => domain,
        idfield => key
      },
      {"$set" => {@catalog.config[:deletion_marker] => datestamp}}
  )
  end
end

#apply_insertsObject



122
123
124
125
126
127
# File 'lib/assimilate/batch.rb', line 122

def apply_inserts
  @adds.each_slice(INSERT_BATCH_SIZE) do |slice|
    # mongo insert can't handle CSV::Row objects, must be converted to regular hashes
    @catalog.catalog.insert(decorate(slice))
  end
end

#apply_updatesObject



129
130
131
132
133
134
135
136
137
138
139
# File 'lib/assimilate/batch.rb', line 129

def apply_updates
  @changes.each do |rec|
    @catalog.catalog.update(
      {
        @domainkey => domain,
        idfield => rec[idfield]
      },
      {"$set" => rec}
    )
  end
end

#commitObject

write the updates to the catalog



89
90
91
92
93
94
95
# File 'lib/assimilate/batch.rb', line 89

def commit
  resolve
  record_batch
  apply_deletes
  apply_inserts
  apply_updates
end

#decorate(records) ⇒ Object



141
142
143
144
145
146
# File 'lib/assimilate/batch.rb', line 141

def decorate(records)
  records.map do |r|
    r[@domainkey] = @domain
    r.to_hash
  end
end

#load_baselineObject



22
23
24
25
26
27
28
29
30
31
# File 'lib/assimilate/batch.rb', line 22

def load_baseline
  stored_records = @catalog.catalog.find(@domainkey => @domain).to_a
  @baseline = stored_records.each_with_object({}) do |rec, h|
    key = rec[@idfield]
    if h.include?(key)
      raise Assimilate::CorruptDataError, "Duplicate records for key [#{key}] in #{@domainkey} [#{@domain}]"
    end
    h[key] = rec
  end
end

#record_batchObject



97
98
99
100
101
102
103
104
105
106
# File 'lib/assimilate/batch.rb', line 97

def record_batch
  raise(Assimilate::DuplicateImportError, "duplicate batch for datestamp #{datestamp}") if @catalog.batches.find(@domainkey => @domain, 'datestamp' => @datestamp).to_a.any?
  raise(Assimilate::DuplicateImportError, "duplicate batch for file #{@filename}") if @catalog.batches.find(@domainkey => @domain, 'filename' => @filename).to_a.any?

  @catalog.batches.insert({
    @domainkey => @domain,
    'datestamp' => @datestamp,
    'filename' => @filename
    })
end

#resolveObject

compute anything needed before we can write updates to permanent store

  • find records that have been deleted



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/assimilate/batch.rb', line 57

def resolve
  if !@resolved
    @deleted_keys = (@baseline.keys - @seen.keys).reject {|k| @baseline[k][@catalog.config[:deletion_marker]]}

    @updated_field_counts = @changes.each_with_object(Hash.new(0)) do |rec,h|
      key = rec[idfield]
      diffs = rec.diff(stripped_record_for(key))
      diffs.keys.each do |f|
        h[f] += 1
      end
    end

    @resolved = true
  end
end

#statsObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/assimilate/batch.rb', line 73

def stats
  resolve
  {
    :baseline_count => @baseline.size,
    :final_count => @baseline.size + @adds.count,
    :adds_count => @adds.count,
    :new_ids => @adds.map {|rec| rec[idfield]},
    :deletes_count => @deleted_keys.count,
    :deleted_ids => @deleted_keys,
    :updates_count => @changes.count,
    :unchanged_count => @noops.count,
    :updated_fields => @updated_field_counts
  }
end

#stripped_record_for(key) ⇒ Object



33
34
35
# File 'lib/assimilate/batch.rb', line 33

def stripped_record_for(key)
  @baseline[key] && @baseline[key].select {|k,v| k !~ /^_/}
end