Class: Praxis::Mapper::IdentityMap

Inherits:
Object
  • Object
show all
Defined in:
lib/praxis-mapper/identity_map.rb,
lib/praxis-mapper/support/factory_girl.rb

Defined Under Namespace

Classes: UnknownIdentity, UnloadedRecordException, UnsupportedModel

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(scope = {}) ⇒ IdentityMap

TODO: support multiple connections



68
69
70
71
72
# File 'lib/praxis-mapper/identity_map.rb', line 68

def initialize(scope={})
  @connection_manager = ConnectionManager.new
  @scope = scope
  clear!
end

Class Attribute Details

.configObject

Returns the value of attribute config.



16
17
18
# File 'lib/praxis-mapper/identity_map.rb', line 16

def config
  @config
end

Instance Attribute Details

#blueprint_cacheObject (readonly)

Returns the value of attribute blueprint_cache.



12
13
14
# File 'lib/praxis-mapper/identity_map.rb', line 12

def blueprint_cache
  @blueprint_cache
end

#queriesObject (readonly)

Returns the value of attribute queries.



12
13
14
# File 'lib/praxis-mapper/identity_map.rb', line 12

def queries
  @queries
end

#scopeObject

Returns the value of attribute scope.



13
14
15
# File 'lib/praxis-mapper/identity_map.rb', line 13

def scope
  @scope
end

#unloadedObject (readonly)

Returns the value of attribute unloaded.



12
13
14
# File 'lib/praxis-mapper/identity_map.rb', line 12

def unloaded
  @unloaded
end

Class Method Details

.currentIdentityMap

Returns current identity map from thread-local variable.

Returns:

  • (IdentityMap)

    current identity map from thread-local variable



28
29
30
31
32
# File 'lib/praxis-mapper/identity_map.rb', line 28

def self.current
  map = Thread.current[:_praxis_mapper_identity_map]
  raise "current IdentityMap not set" unless map
  map
end

.current=(identity_map) ⇒ Object

Stores given identity map in a thread-local variable

Parameters:



22
23
24
# File 'lib/praxis-mapper/identity_map.rb', line 22

def self.current=(identity_map)
  Thread.current[:_praxis_mapper_identity_map] = identity_map
end

.current?Boolean

Returns whether identity map thread-local variable has been set.

Returns:

  • (Boolean)

    whether identity map thread-local variable has been set



36
37
38
# File 'lib/praxis-mapper/identity_map.rb', line 36

def self.current?
  Thread.current.key?(:_praxis_mapper_identity_map) && Thread.current[:_praxis_mapper_identity_map].kind_of?(Praxis::Mapper::IdentityMap)
end

.setup!(scope = {}) ⇒ Object

TODO: how come scope can be set from 3 different methods?

Examples:

=> [:account_id, 71], :user => [:user_id, 2]

Parameters:

  • scope (Hash) (defaults to: {})

    a set of named filters to apply in query



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/praxis-mapper/identity_map.rb', line 54

def self.setup!(scope={})
  if self.current?
    if !self.current.clear?
      raise "Denied for a pre-existing condition: Identity map has been used."
    else
      self.current.scope = scope
      return self.current
    end
  else
    self.current = self.new(scope)
  end
end

Instance Method Details

#<<(record) ⇒ Object



390
391
392
393
394
395
396
397
398
399
400
401
402
# File 'lib/praxis-mapper/identity_map.rb', line 390

def <<(record)
  model = record.class

  @rows[model] << record
  record.identity_map = self

  model.identities.each do |identity|
    key = record.send(identity)

    get_staged(model, identity).delete(key)
    @row_keys[model][identity][key] = record
  end
end

#add_record(record) ⇒ Object



509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
# File 'lib/praxis-mapper/identity_map.rb', line 509

def add_record(record)
  model = record.class
  record.identities.each do |identity, key|
    # FIXME: Should we be overwriting (possibly) a "nil" value from before?
    #        (due to that row not being found by a previous query)
    #        (That'd be odd since that means we tried to load that same identity)

    return false if @row_keys[model][identity].has_key? key

    get_staged(model, identity).delete(key)
    @row_keys[model][identity][key] = record
  end

  record.identity_map = self
  @rows[model] << record
  record
end

#add_records(records) ⇒ Object



462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/praxis-mapper/identity_map.rb', line 462

def add_records(records)
  return if records.empty?

  records_added = Array.new

  to_stage = Hash.new do |hash,staged_model|
    hash[staged_model] = Hash.new do |identities, identity_name|
      identities[identity_name] = Set.new
    end
  end

  model = records.first.class

  tracked_associations = if (query = records.first._query)
    query.tracked_associations.each do |tracked_association|
      associated_model = tracked_association[:model]
      to_stage[associated_model][:_queries] << query
    end
  else
    []
  end

  tracked_associations.each do |tracked_association|
    associated_model = tracked_association[:model]
    association_type = tracked_association[:type]

    association_key, row_keys = stage_for!(tracked_association, records)
    row_keys.each do |row_key|
      to_stage[associated_model][association_key].add(row_key)
    end

  end

  records.each do |record|
    if add_record(record)
      records_added << record
    end
  end

  to_stage.each do |model_to_stage, data|
    stage(model_to_stage, data)
  end

  records_added
end

#all(model, conditions = {}) ⇒ Object



325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/praxis-mapper/identity_map.rb', line 325

def all(model,conditions={})
  return rows_for(model) if conditions.empty?

  key, values = conditions.first

  # optimize the common case of a single value
  if values.size == 1
    value = values[0]
    if @row_keys[model].has_key?(key)
      res = row_by_key(model, key, value)
      if res
        [row_by_key(model, key, value)]
      else
        []
      end
    else
      index(model, key, value)
    end
  else
    if @row_keys[model].has_key?(key)
      values.collect do |value|
        row_by_key(model, key, value)
      end
    else
      values.each_with_object(Array.new) do |value, results|
        results.push *index(model, key, value)
      end
    end
  end
end

#clear!Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/praxis-mapper/identity_map.rb', line 74

def clear!
  @rows = Hash.new { |h,k| h[k] = Array.new }

  # for ex:
  #   @staged[Instance][:id] = Set.new
  # yields:
  #  {Instance => {:id => Set.new(1,2,3), :name => Set.new("George Jr.") } }
  @staged = Hash.new do |hash,model|
    hash[model] = Hash.new do |identity_hash, identity_name|
      identity_hash[identity_name] = Set.new
    end
  end

  # for ex:
  #   @row_keys["instances"][:id][1] = Object.new
  # yields:
  #   {"instances"=>{:id=>{1=>Object.new}}
  @row_keys = Hash.new do |row_hash,model|
    row_hash[model] = Hash.new do |primary_keys, key_name|
      primary_keys[key_name] = Hash.new
    end
  end

  @queries = Hash.new { |h,k| h[k] = Set.new }

  # see how it feels to store blueprints here
  # for ex:
  #   @blueprints[User][some_object] = User.new(some_object)
  @blueprint_cache = Hash.new do |cache,blueprint_class|
    cache[blueprint_class] = Hash.new
  end

  # TODO: rework this so it's a hash with default values and simplify #index
  @secondary_indexes =  Hash.new
end

#clear?Boolean

Returns:

  • (Boolean)


41
42
43
44
45
46
# File 'lib/praxis-mapper/identity_map.rb', line 41

def clear?
  @rows.empty? &&
    @staged.empty? &&
    @row_keys.empty? &&
    @queries.empty?
end

#connection(name) ⇒ Object



385
386
387
# File 'lib/praxis-mapper/identity_map.rb', line 385

def connection(name)
  @connection_manager.checkout(name)
end

#extract_keys(field, records) ⇒ Object



405
406
407
408
409
410
411
412
413
414
415
416
# File 'lib/praxis-mapper/identity_map.rb', line 405

def extract_keys(field, records)
  row_keys = []
  if field.kind_of?(Array) # composite identities
    records.each do |record|
      row_key = field.collect { |col| record.send(col) }
      row_keys << row_key unless row_key.include?(nil)
    end
  else
    row_keys.push *records.collect(&field).compact
  end
  row_keys
end

#finalize!(*models) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
# File 'lib/praxis-mapper/identity_map.rb', line 178

def finalize!(*models)
  if models.empty?
    models = @staged.keys
  end

  did_something = models.any? do |model|
    finalize_model!(model).any?
  end

  finalize! if did_something
end

#finalize_model!(model, query = nil) ⇒ Object

don’t doc. never ever use yourself! FIXME: make private and fix specs that break?



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/praxis-mapper/identity_map.rb', line 193

def finalize_model!(model, query=nil)
  staged_queries = @staged[model].delete(:_queries) || []
  staged_keys = @staged[model].keys
  identities = staged_keys && model.identities
  non_identities = staged_keys - model.identities

  results = Set.new

  return results if @staged[model].all? { |(key,values)| values.empty? }

  if query.nil?
    query_class = @connection_manager.repository(model.repository_name)[:query]
    query = query_class.new(self,model)
  end

  # Apply any relevant blocks passed to track in the original queries
  staged_queries.each do |staged_query|
    staged_query.track.each do |(association_name, block)|
      next unless block

      spec = staged_query.model.associations[association_name]

      if spec[:model] == model
        query.instance_eval(&block)
        if (spec[:type] == :many_to_one || spec[:type] == :array_to_many) && query.where
          file, line = block.source_location
          trace = ["#{file}:#{line}"] | caller
          raise RuntimeError, "Error finalizing model #{model.name} for association #{association_name.inspect} -- using a where clause when tracking associations of type #{spec[:type].inspect} is not supported", trace
        end
      end
    end
  end


  # process non-unique staged keys
  #   select identity (any one should do) for those keys and stage blindly
  #   load and add records.

  if non_identities.any?
    to_stage = Hash.new do |hash,identity|
      hash[identity] = Set.new
    end

    non_identities.each do |key|
      values = @staged[model].delete(key)

      rows = query.multi_get(key, values, select: model.identities)
      rows.each do |row|
        model.identities.each do |identity|
          if identity.kind_of? Array
            to_stage[identity] << row.values_at(*identity)
          else
            to_stage[identity] << row[identity]
          end
        end
      end
    end

    self.stage(model, to_stage)
  end

  model.identities.each do |identity_name|
    values = self.get_staged(model,identity_name)
    next if values.empty?

    query.where = nil # clear out any where clause from non-identity
    records = query.multi_get(identity_name, values).collect do |row|
      m = model.new(row)
      m._query = query
      m
    end

    add_records(records)

    # TODO: refactor this to better-hide queries?
    self.queries[model].add(query)

    results.merge(records)

    # add nil records for records that were not found by the multi_get
    missing_keys = self.get_staged(model,identity_name)
    missing_keys.each do |missing_key|
      @row_keys[model][identity_name][missing_key] = nil
      get_staged(model, identity_name).delete(missing_key)
    end

  end

  query.freeze

  # TODO: check whether really really did get all the records we should have....
  results.to_a
end

#get(model, condition) ⇒ Object



357
358
359
360
361
# File 'lib/praxis-mapper/identity_map.rb', line 357

def get(model,condition)
  key, value = condition.first

  row_by_key(model, key, value)
end

#get_staged(model, key) ⇒ Object



364
365
366
# File 'lib/praxis-mapper/identity_map.rb', line 364

def get_staged(model, key)
  @staged[model][key]
end

#index(model, key, value) ⇒ Object



301
302
303
304
305
306
307
308
309
310
# File 'lib/praxis-mapper/identity_map.rb', line 301

def index(model, key, value)
  @secondary_indexes[model] ||= Hash.new

  unless @secondary_indexes[model].has_key? key
    @secondary_indexes[model][key] ||= Hash.new
    reindex!(model, key)
  end

  @secondary_indexes[model][key][value] ||= Array.new
end

#load(model, &block) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/praxis-mapper/identity_map.rb', line 110

def load(model, &block)
  raise "Can't load unfinalized model #{model}" unless model.finalized?

  query_class = @connection_manager.repository(model.repository_name)[:query]
  query = query_class.new(self, model, &block)

  return finalize_model!(model, query) if query.where == :staged

  records = query.execute
  add_records(records)

  # TODO: refactor this to better-hide queries?
  query.freeze
  self.queries[model].add(query)

  subload(model, query,records)

  records
end

#persist!Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/praxis-mapper/support/factory_girl.rb', line 85

def persist!
  @rows.each_with_object(Hash.new) do |(model, records), inserted|
    next unless (table = model.table_name)

    db ||= self.connection(model.repository_name)

    new_records = records.select(&:new_record)
    next if new_records.empty?
    
    db[table.to_sym].multi_insert new_records.collect(&:_data)

    new_records.each { |rec| rec.new_record = false }
    inserted[model] = new_records        
  end
end

#query_statisticsObject



527
528
529
# File 'lib/praxis-mapper/identity_map.rb', line 527

def query_statistics
  QueryStatistics.new(queries)
end

#reindex!(model, key) ⇒ Object



313
314
315
316
317
318
319
320
321
322
# File 'lib/praxis-mapper/identity_map.rb', line 313

def reindex!(model, key)
  rows_for(model).each do |row|
    val = if key.kind_of? Array
      key.collect { |k| row.send(k) }
    else
      row.send(key)
    end
    index(model, key, val) << row
  end
end

#row_by_key(model, key, value) ⇒ Object



288
289
290
291
292
293
# File 'lib/praxis-mapper/identity_map.rb', line 288

def row_by_key(model,key, value)
  @row_keys[model][key].fetch(value) do
    raise UnloadedRecordException, "Did not load #{model} with #{key} = #{value.inspect}."
  end

end

#rows_for(model) ⇒ Object



296
297
298
# File 'lib/praxis-mapper/identity_map.rb', line 296

def rows_for(model)
  @rows[model]
end

#stage(model, data) ⇒ Object



369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/praxis-mapper/identity_map.rb', line 369

def stage(model, data)
  data.each do |key, values|
    unless values.kind_of? Enumerable
      values = [values]
    end

    # ignore rows we have already loaded... add sanity checking?
    if model.identities.include?(key)
      values.reject! { |k| @row_keys[model][key].has_key? k }
    end

    get_staged(model,key).merge(values)
  end
end

#stage_array_to_many(tracked_association, records) ⇒ Object



439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# File 'lib/praxis-mapper/identity_map.rb', line 439

def stage_array_to_many(tracked_association, records)
  key = tracked_association[:key]
  primary_key = tracked_association[:primary_key] || :id

  row_keys = []
  records.collect(&key).each do |keys|
    row_keys.push *keys
  end

  row_keys.reject! do |row_key|
    row_key.nil? || (row_key.kind_of?(Array) && row_key.include?(nil))
  end

  [primary_key, row_keys]
end

#stage_for!(spec, records) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/praxis-mapper/identity_map.rb', line 130

def stage_for!(spec, records)
  case spec[:type]
  when :many_to_one
    stage_many_to_one(spec, records)
  when :array_to_many
    stage_array_to_many(spec, records)
  when :one_to_many
    stage_one_to_many(spec, records)
  when :many_to_array
    stage_many_to_array(spec, records)
  end
end

#stage_many_to_array(tracked_association, records) ⇒ Object



457
458
459
# File 'lib/praxis-mapper/identity_map.rb', line 457

def stage_many_to_array(tracked_association, records)
  raise "not supported yet"
end

#stage_many_to_one(tracked_association, records) ⇒ Object



419
420
421
422
423
424
425
426
# File 'lib/praxis-mapper/identity_map.rb', line 419

def stage_many_to_one(tracked_association, records)
  key = tracked_association[:key]
  primary_key = tracked_association[:primary_key] || :id

  row_keys = extract_keys(key, records)

  [primary_key, row_keys]
end

#stage_one_to_many(tracked_association, records) ⇒ Object



429
430
431
432
433
434
435
436
# File 'lib/praxis-mapper/identity_map.rb', line 429

def stage_one_to_many(tracked_association, records)
  key = tracked_association[:key]
  primary_key = tracked_association[:primary_key] || :id

  row_keys = extract_keys(primary_key, records)

  [key, row_keys]
end

#subload(model, query, records) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/praxis-mapper/identity_map.rb', line 143

def subload(model, query, records)
  query.load.each do |(association_name, block)|
    spec = model.associations.fetch(association_name)

    associated_model = spec[:model]

    key, values = stage_for!(spec, records)

    existing_records = []
    values.reject! do |value|
      if @row_keys[associated_model].has_key?(key) &&
          @row_keys[associated_model][key].has_key?(value)
        existing_records << @row_keys[associated_model][key][value]
      else
        false
      end
    end

    new_query_class = @connection_manager.repository(associated_model.repository_name)[:query]
    new_query = new_query_class.new(self,associated_model, &block)

    new_records = new_query.multi_get(key, values).collect do |row|
      m = spec[:model].new(row)
      m._query = new_query
      m
    end

    self.queries[associated_model].add(new_query)

    add_records(new_records)

    subload(associated_model, new_query, new_records + existing_records)
  end
end