Class: Assimilate::Extender

Inherits:
Object
  • Object
show all
Defined in:
lib/assimilate/extender.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Extender

Returns a new instance of Extender.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/assimilate/extender.rb', line 5

def initialize(args)
  @catalog = args[:catalog]
  @domainkey = @catalog.config[:domain]

  @domain = args[:domain]
  @idfield = args[:idfield]
  @filename = args[:filename]
  @keyfield = args[:key]
  @comparison_field = args[:compare]

  load_baseline

  @noops = []
  @changes = []
  @adds = []
  @deletes = []
end

Instance Attribute Details

#addsObject (readonly)

Returns the value of attribute adds.



3
4
5
# File 'lib/assimilate/extender.rb', line 3

def adds
  @adds
end

#changesObject (readonly)

Returns the value of attribute changes.



3
4
5
# File 'lib/assimilate/extender.rb', line 3

def changes
  @changes
end

#deletesObject (readonly)

Returns the value of attribute deletes.



3
4
5
# File 'lib/assimilate/extender.rb', line 3

def deletes
  @deletes
end

#domainObject (readonly)

Returns the value of attribute domain.



2
3
4
# File 'lib/assimilate/extender.rb', line 2

def domain
  @domain
end

#idfieldObject (readonly)

Returns the value of attribute idfield.



2
3
4
# File 'lib/assimilate/extender.rb', line 2

def idfield
  @idfield
end

#keyfieldObject (readonly)

Returns the value of attribute keyfield.



2
3
4
# File 'lib/assimilate/extender.rb', line 2

def keyfield
  @keyfield
end

#seenObject (readonly)

Returns the value of attribute seen.



3
4
5
# File 'lib/assimilate/extender.rb', line 3

def seen
  @seen
end

Instance Method Details

#<<(record) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/assimilate/extender.rb', line 66

def <<(record)
  @seen ||= Hash.new(0)

  hash = record.to_hash
  key = hash[@idfield]
  data = hash.reject {|k,v| k == idfield}
  # @seen[key] = data
  current_record = @baseline[key]
  if current_record
    if apply_this_update?(current_record, data)
      @changes << key
      @seen[key] = data
    else
      @noops << key
      @seen[key] = {}
    end
  else
    @adds << key
    @seen[key] = data
  end
end

#apply_insertsObject

an “insert” here means a record for which we have extended data but does not appear in the current catalog, so we need to create a stub entry.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/assimilate/extender.rb', line 110

def apply_inserts
  @adds.each do |key|
    data = @seen[key]
    if keyfield
      @catalog.catalog.insert(
        @domainkey => domain,
        idfield => key,
        keyfield => data
      )
    else
      # top-level extension
      @catalog.catalog.insert(
        data.merge(
          @domainkey => domain,
          idfield => key
        )
      )
    end
  end
end

#apply_this_update?(current_record, new_data) ⇒ Boolean

if there is a field to compare on (i.e. a timestamp), then apply the update if the timestamp is newer; otherwise (no timestamp) compare the hashes and apply if there are any differences.

Returns:

  • (Boolean)


55
56
57
58
59
60
61
62
63
64
# File 'lib/assimilate/extender.rb', line 55

def apply_this_update?(current_record, new_data)
  if @comparison_field && current_record[keyfield]
    is_newer(current_record[keyfield], new_data)
  elsif keyfield
    current_record[keyfield] != new_data
  else
    # top-level extension - compare all the attributes to be added
    new_data.select {|k,v| current_record[k] != v}.any?
  end
end

#apply_updatesObject

“update” means store the extended data in the record (which must exist)



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/assimilate/extender.rb', line 132

def apply_updates
  @changes.each do |key|
    data = @seen[key]
    if keyfield
      @catalog.catalog.update(
        {
          @domainkey => domain,
          idfield => key
        },
        {"$set" => {
            keyfield => data
          }
        }
      )
    else
      # top-level extension
      @catalog.catalog.update(
        {
          @domainkey => domain,
          idfield => key
        },
        {"$set" => data }
      )
    end
  end
end

#commitObject

write all the changes to the catalog



102
103
104
105
# File 'lib/assimilate/extender.rb', line 102

def commit
  apply_inserts
  apply_updates
end

#is_newer(current_data, new_data) ⇒ Object



49
50
51
# File 'lib/assimilate/extender.rb', line 49

def is_newer(current_data, new_data)
  new_data[@comparison_field].to_i > current_data[@comparison_field].to_i
end

#load_baselineObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/assimilate/extender.rb', line 23

def load_baseline
  stored_records = @catalog.catalog.find(@domainkey => @domain).to_a
  @baseline = stored_records.each_with_object({}) do |rec, h|
    key = rec[@idfield]
    if key
      # ignore records that are missing a key value.
      if h.include?(key)
        # conflict with existing record for the same key
        # HACK - HARD-CODED BEHAVIOR:
        # * look for a boolean field called '_active'; if the previously-stored value says inactive and
        # this one is active, then use the new one.
        if h[key].include?('_active') && rec.include?('_active') && h[key]['_active'] != rec['_active']
          rec = [h[key],rec].select {|r| r['_active']}.first
        else
          raise Assimilate::CorruptDataError, "Duplicate records for key [#{key}] in #{@domainkey} [#{@domain}]"
        end
      end
      h[key] = rec
    end
  end
  # puts @baseline
  if @baseline.empty?
    raise Assimilate::CorruptDataError, "Unable to find any records with #{@idfield} in #{@domainkey} [#{@domain}]"
  end
end

#statsObject



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/assimilate/extender.rb', line 88

def stats
  {
    :baseline_count => @baseline.size,
    :final_count => @baseline.size + @adds.count,
    :distinct_ids => @seen.size,
    :adds_count => @adds.count,
    :new_ids => @adds,
    :updates_count => @changes.count,
    :updated_fields => @seen.each_with_object(Hash.new(0)) {|(k,hash),memo| hash.each {|k,v| memo[k] += 1}},
    :unchanged_count => @noops.count
  }
end