Class: Praxis::Mapper::IdentityMap

Inherits:
Object
  • Object
show all
Includes:
Praxis::Mapper::IdentityMapExtensions::Persistence
Defined in:
lib/praxis-mapper/identity_map.rb,
lib/praxis-mapper/support/factory_girl.rb

Defined Under Namespace

Classes: UnknownIdentity, UnloadedRecordException, UnsupportedModel

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Praxis::Mapper::IdentityMapExtensions::Persistence

#attach, #deindex, #detach, #flush!, #reindex, #remove

Constructor Details

#initialize(scope = {}) ⇒ IdentityMap

TODO: support multiple connections



72
73
74
75
76
77
# File 'lib/praxis-mapper/identity_map.rb', line 72

def initialize(scope={})
  @connection_manager = ConnectionManager.new
  @scope = scope
  @selector_generator = Praxis::Mapper::SelectorGenerator.new
  clear!
end

Class Attribute Details

.configObject

Returns the value of attribute config.



20
21
22
# File 'lib/praxis-mapper/identity_map.rb', line 20

def config
  @config
end

Instance Attribute Details

#blueprint_cacheObject (readonly)

Returns the value of attribute blueprint_cache.



15
16
17
# File 'lib/praxis-mapper/identity_map.rb', line 15

def blueprint_cache
  @blueprint_cache
end

#queriesObject (readonly)

Returns the value of attribute queries.



14
15
16
# File 'lib/praxis-mapper/identity_map.rb', line 14

def queries
  @queries
end

#scopeObject

Returns the value of attribute scope.



17
18
19
# File 'lib/praxis-mapper/identity_map.rb', line 17

def scope
  @scope
end

#unloadedObject (readonly)

Returns the value of attribute unloaded.



13
14
15
# File 'lib/praxis-mapper/identity_map.rb', line 13

def unloaded
  @unloaded
end

Class Method Details

.currentIdentityMap

Returns current identity map from thread-local variable.

Returns:

  • (IdentityMap)

    current identity map from thread-local variable



32
33
34
35
36
# File 'lib/praxis-mapper/identity_map.rb', line 32

def self.current
  map = Thread.current[:_praxis_mapper_identity_map]
  raise "current IdentityMap not set" unless map
  map
end

.current=(identity_map) ⇒ Object

Stores given identity map in a thread-local variable

Parameters:



26
27
28
# File 'lib/praxis-mapper/identity_map.rb', line 26

def self.current=(identity_map)
  Thread.current[:_praxis_mapper_identity_map] = identity_map
end

.current?Boolean

Returns whether identity map thread-local variable has been set.

Returns:

  • (Boolean)

    whether identity map thread-local variable has been set



40
41
42
# File 'lib/praxis-mapper/identity_map.rb', line 40

def self.current?
  Thread.current.key?(:_praxis_mapper_identity_map) && Thread.current[:_praxis_mapper_identity_map].kind_of?(Praxis::Mapper::IdentityMap)
end

.setup!(scope = {}) ⇒ Object

TODO: how come scope can be set from 3 different methods?

Examples:

=> [:account_id, 71], :user => [:user_id, 2]

Parameters:

  • scope (Hash) (defaults to: {})

    a set of named filters to apply in query



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/praxis-mapper/identity_map.rb', line 58

def self.setup!(scope={})
  if self.current?
    if !self.current.clear?
      raise "Denied for a pre-existing condition: Identity map has been used."
    else
      self.current.scope = scope
      return self.current
    end
  else
    self.current = self.new(scope)
  end
end

Instance Method Details

#_finalize!(*models) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/praxis-mapper/identity_map.rb', line 214

def _finalize!(*models)
  models = @staged.keys if models.empty?

  did_something = models.any? do |model|
    finalize_model!(model).any?
  end

  if did_something
    _finalize!
  else
    release
  end
end

#add_record(record) ⇒ Object Also known as: <<

return the record provided (if added to the identity map) or return the corresponding record if it was already present



520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
# File 'lib/praxis-mapper/identity_map.rb', line 520

def add_record(record)
  model = record.class
  record.identities.each do |identity, key|
    # FIXME: Should we be overwriting (possibly) a "nil" value from before?
    #        (due to that row not being found by a previous query)
    #        (That'd be odd since that means we tried to load that same identity)
    if (existing = @row_keys[model][identity][key])
      # FIXME: should merge record into existing to add any additional fields
      return existing
    end

    get_staged(model, identity).delete(key)
    @row_keys[model][identity][key] = record
  end

  @secondary_indexes[model].each do |key, indexed_values|
    val = if key.kind_of? Array
      key.collect { |k| record.send(k) }
    else
      record.send(key)
    end

    indexed_values[val] << record
  end

  record.identity_map = self
  @rows[model] << record
  record
end

#add_records(records) ⇒ Object



478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# File 'lib/praxis-mapper/identity_map.rb', line 478

def add_records(records)
  return [] if records.empty?

  to_stage = Hash.new do |hash,staged_model|
    hash[staged_model] = Hash.new do |identities, identity_name|
      identities[identity_name] = Set.new
    end
  end

  tracked_associations = if (query = records.first._query)
    query.tracked_associations.each do |tracked_association|
      associated_model = tracked_association[:model]
      to_stage[associated_model][:_queries] << query
    end
  else
    []
  end

  tracked_associations.each do |tracked_association|
    associated_model = tracked_association[:model]

    association_key, row_keys = stage_for!(tracked_association, records)
    row_keys.each do |row_key|
      to_stage[associated_model][association_key].add(row_key)
    end

  end

  im_records = records.collect do |record|
    add_record(record)
  end

  to_stage.each do |model_to_stage, data|
    stage(model_to_stage, data)
  end

  im_records
end

#add_selectors(resource, fields) ⇒ Object



556
557
558
# File 'lib/praxis-mapper/identity_map.rb', line 556

def add_selectors(resource, fields)
  @selector_generator.add(resource, fields)
end

#all(model, conditions = {}) ⇒ Object



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/praxis-mapper/identity_map.rb', line 357

def all(model,conditions={})
  return rows_for(model) if conditions.empty?

  key, values = conditions.first

  # optimize the common case of a single value
  if values.size == 1
    value = values[0]
    if @row_keys[model].has_key?(key)
      res = @row_keys[model][key][value]

      if res
        [res]
      else
        []
      end
    else
      index(model, key, value)
    end
  else
    if @row_keys[model].has_key?(key)
      values.collect do |v|
        @row_keys[model][key][v]
      end.compact
    else
      values.each_with_object(Array.new) do |v, results|
        results.push(*index(model, key, v))
      end
    end
  end
end

#clear!Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/praxis-mapper/identity_map.rb', line 83

def clear!
  @rows = Hash.new { |h,k| h[k] = Array.new }

  # for ex:
  #   @staged[Instance][:id] = Set.new
  # yields:
  #  {Instance => {:id => Set.new(1,2,3), :name => Set.new("George Jr.") } }
  @staged = Hash.new do |hash,model|
    hash[model] = Hash.new do |identity_hash, identity_name|
      identity_hash[identity_name] = Set.new
    end
  end

  # for ex:
  #   @row_keys["instances"][:id][1] = Object.new
  # yields:
  #   {"instances"=>{:id=>{1=>Object.new}}
  @row_keys = Hash.new do |row_hash,model|
    row_hash[model] = Hash.new do |primary_keys, key_name|
      primary_keys[key_name] = Hash.new
    end
  end

  @queries = Hash.new { |h,k| h[k] = Set.new }

  # see how it feels to store blueprints here
  # for ex:
  #   @blueprints[User][some_object] = User.new(some_object)
  @blueprint_cache = Hash.new do |cache,blueprint_class|
    cache[blueprint_class] = Hash.new
  end

  # TODO: rework this so it's a hash with default values and simplify #index
  @secondary_indexes = Hash.new do |hash, model|
    hash[model] = Hash.new
  end
end

#clear?Boolean

Returns:

  • (Boolean)


45
46
47
48
49
50
# File 'lib/praxis-mapper/identity_map.rb', line 45

def clear?
  @rows.empty? &&
    @staged.empty? &&
    @row_keys.empty? &&
    @queries.empty?
end

#connection(name) ⇒ Object



417
418
419
# File 'lib/praxis-mapper/identity_map.rb', line 417

def connection(name)
  @connection_manager.checkout(name)
end

#extract_keys(field, records) ⇒ Object



421
422
423
424
425
426
427
428
429
430
431
432
# File 'lib/praxis-mapper/identity_map.rb', line 421

def extract_keys(field, records)
  row_keys = Set.new
  if field.kind_of?(Array) # composite identities
    records.each do |record|
      row_key = field.collect { |col| record.send(col) }
      row_keys << row_key unless row_key.include?(nil)
    end
  else
    row_keys.merge records.collect(&field).compact
  end
  row_keys
end

#finalize!(*models_and_opts) ⇒ Object

Last parameter in array can be a hash of objects It is implemented this way (instead of (*models, instrument: true)) because when passing in Sequel models, ruby will invoke the “.to_hash” on them, causing a “load” when trying to restructure the args



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/praxis-mapper/identity_map.rb', line 194

def finalize!(*models_and_opts)

  models, instrument = if models_and_opts.last.kind_of?(::Hash)
    ins = models_and_opts.last.fetch(:instrument) do
      true
    end
    [ models_and_opts[0..-2], ins ]
  else
    [ models_and_opts, true ]
  end

  if instrument
    ActiveSupport::Notifications.instrument 'praxis.mapper.finalize' do
      _finalize!(*models)
    end
  else
    _finalize!(*models)
  end
end

#finalize_model!(model, query = nil) ⇒ Object

don’t doc. never ever use yourself! FIXME: make private and fix specs that break?



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/praxis-mapper/identity_map.rb', line 234

def finalize_model!(model, query = nil)
  staged_queries = @staged[model].delete(:_queries) || []
  staged_keys = @staged[model].keys
  non_identities = staged_keys - model.identities

  results = Set.new

  return results if @staged[model].all? { |(_key, values)| values.empty? }

  if query.nil?
    query_class = @connection_manager.repository(model.repository_name)[:query]
    query = query_class.new(self, model)
  end

  # Apply any relevant blocks passed to track in the original queries
  staged_queries.each do |staged_query|
    staged_query.track.each do |(association_name, block)|
      next unless block

      spec = staged_query.model.associations[association_name]

      if spec[:model] == model
        query.instance_eval(&block)
        if (spec[:type] == :many_to_one || spec[:type] == :array_to_many) && query.where
          file, line = block.source_location
          trace = ["#{file}:#{line}"] | caller
          raise RuntimeError, "Error finalizing model #{model.name} for association #{association_name.inspect} -- using a where clause when tracking associations of type #{spec[:type].inspect} is not supported", trace
        end
      end
    end
  end


  # process non-unique staged keys
  #   select identity (any one should do) for those keys and stage blindly
  #   load and add records.

  if non_identities.any?
    to_stage = Hash.new do |hash,identity|
      hash[identity] = Set.new
    end

    non_identities.each do |key|
      values = @staged[model].delete(key)

      rows = query.multi_get(key, values, select: model.identities, raw: true)
      rows.each do |row|
        model.identities.each do |identity|
          if identity.kind_of? Array
            to_stage[identity] << row.values_at(*identity)
          else
            to_stage[identity] << row[identity]
          end
        end
      end
    end

    self.stage(model, to_stage)
  end

  model.identities.each do |identity_name|
    values = self.get_staged(model,identity_name)
    next if values.empty?

    query.where = nil # clear out any where clause from non-identity
    records = query.multi_get(identity_name, values)

    # TODO: refactor this to better-hide queries?
    self.queries[model].add(query)

    results.merge(add_records(records))

    # add nil records for records that were not found by the multi_get
    missing_keys = self.get_staged(model,identity_name)
    missing_keys.each do |missing_key|
      @row_keys[model][identity_name][missing_key] = nil
      get_staged(model, identity_name).delete(missing_key)
    end

  end

  query.freeze

  # TODO: check whether really really did get all the records we should have....
  results.to_a
end

#get(model, condition) ⇒ Object



390
391
392
393
394
# File 'lib/praxis-mapper/identity_map.rb', line 390

def get(model,condition)
  key, value = condition.first

  row_by_key(model, key, value)
end

#get_staged(model, key) ⇒ Object



397
398
399
# File 'lib/praxis-mapper/identity_map.rb', line 397

def get_staged(model, key)
  @staged[model][key]
end

#index(model, key, value) ⇒ Object



334
335
336
337
338
339
340
341
# File 'lib/praxis-mapper/identity_map.rb', line 334

def index(model, key, value)
  unless @secondary_indexes[model].has_key? key
    @secondary_indexes[model][key] ||= Hash.new
    reindex!(model, key)
  end

  @secondary_indexes[model][key][value] ||= Array.new
end

#load(model, &block) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/praxis-mapper/identity_map.rb', line 122

def load(model, &block)
  raise "Can't load unfinalized model #{model}" unless model.finalized?

  query_class = @connection_manager.repository(model.repository_name)[:query]
  query = query_class.new(self, model, &block)

  if query.where == :staged
    query.where = nil
    return finalize_model!(model, query)
  end

  ActiveSupport::Notifications.instrument 'praxis.mapper.load', model: model do
    records = query.execute
    im_records = add_records(records)

    # TODO: refactor this to better-hide queries?
    query.freeze
    queries[model].add(query)

    subload(model, query, records)

    im_records
  end
end

#persist!Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/praxis-mapper/support/factory_girl.rb', line 81

def persist!
  @rows.each_with_object(Hash.new) do |(model, records), inserted|
    next unless (table = model.table_name)

    db ||= self.connection(model.repository_name)

    new_records = records.select(&:new_record)
    next if new_records.empty?
    
    db[table.to_sym].multi_insert new_records.collect(&:_data)

    new_records.each { |rec| rec.new_record = false }
    inserted[model] = new_records        
  end
end

#query_statisticsObject



552
553
554
# File 'lib/praxis-mapper/identity_map.rb', line 552

def query_statistics
  QueryStatistics.new(queries)
end

#reindex!(model, key) ⇒ Object



344
345
346
347
348
349
350
351
352
353
354
# File 'lib/praxis-mapper/identity_map.rb', line 344

def reindex!(model, key)
  rows_for(model).each do |row|
    val = if key.kind_of? Array
      key.collect { |k| row.send(k) }
    else
      row.send(key)
    end
    # FIXME: make this a set? or handle duplicates better
    index(model, key, val) << row
  end
end

#releaseObject



228
229
230
# File 'lib/praxis-mapper/identity_map.rb', line 228

def release
  @connection_manager.release
end

#row_by_key(model, key, value) ⇒ Object



322
323
324
325
326
# File 'lib/praxis-mapper/identity_map.rb', line 322

def row_by_key(model,key, value)
  @row_keys[model][key].fetch(value) do
    raise UnloadedRecordException, "Did not load #{model} with #{key} = #{value.inspect}."
  end
end

#rows_for(model) ⇒ Object



329
330
331
# File 'lib/praxis-mapper/identity_map.rb', line 329

def rows_for(model)
  @rows[model]
end

#selectorsObject



79
80
81
# File 'lib/praxis-mapper/identity_map.rb', line 79

def selectors
  @selector_generator.selectors
end

#stage(model, data) ⇒ Object



402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/praxis-mapper/identity_map.rb', line 402

def stage(model, data)
  data.each do |key, values|
    unless values.kind_of? Enumerable
      values = [values]
    end

    # ignore rows we have already loaded... add sanity checking?
    if model.identities.include?(key)
      values.reject! { |k| @row_keys[model][key].has_key? k }
    end

    get_staged(model,key).merge(values)
  end
end

#stage_array_to_many(tracked_association, records) ⇒ Object



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
# File 'lib/praxis-mapper/identity_map.rb', line 455

def stage_array_to_many(tracked_association, records)
  key = tracked_association[:key]
  primary_key = tracked_association[:primary_key] || :id

  row_keys = Set.new
  records.collect(&key).each do |keys|
    row_keys.merge keys
  end

  row_keys.reject! do |row_key|
    row_key.nil? || (row_key.kind_of?(Array) && row_key.include?(nil))
  end

  [primary_key, row_keys]
end

#stage_for!(spec, records) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/praxis-mapper/identity_map.rb', line 147

def stage_for!(spec, records)
  case spec[:type]
  when :many_to_one
    stage_many_to_one(spec, records)
  when :array_to_many
    stage_array_to_many(spec, records)
  when :one_to_many
    stage_one_to_many(spec, records)
  when :many_to_array
    stage_many_to_array(spec, records)
  end
end

#stage_many_to_array(tracked_association, records) ⇒ Object



473
474
475
# File 'lib/praxis-mapper/identity_map.rb', line 473

def stage_many_to_array(tracked_association, records)
  raise "not supported yet"
end

#stage_many_to_one(tracked_association, records) ⇒ Object



435
436
437
438
439
440
441
442
# File 'lib/praxis-mapper/identity_map.rb', line 435

def stage_many_to_one(tracked_association, records)
  key = tracked_association[:key]
  primary_key = tracked_association[:primary_key] || :id

  row_keys = extract_keys(key, records)

  [primary_key, row_keys]
end

#stage_one_to_many(tracked_association, records) ⇒ Object



445
446
447
448
449
450
451
452
# File 'lib/praxis-mapper/identity_map.rb', line 445

def stage_one_to_many(tracked_association, records)
  key = tracked_association[:key]
  primary_key = tracked_association[:primary_key] || :id

  row_keys = extract_keys(primary_key, records)

  [key, row_keys]
end

#subload(model, query, records) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/praxis-mapper/identity_map.rb', line 160

def subload(model, query, records)
  query.load.each do |(association_name, block)|
    spec = model.associations.fetch(association_name)

    associated_model = spec[:model]

    key, values = stage_for!(spec, records)

    existing_records = []
    values.reject! do |value|
      if @row_keys[associated_model].has_key?(key) &&
          @row_keys[associated_model][key].has_key?(value)
        existing_records << @row_keys[associated_model][key][value]
      else
        false
      end
    end

    new_query_class = @connection_manager.repository(associated_model.repository_name)[:query]
    new_query = new_query_class.new(self,associated_model, &block)

    new_records = new_query.multi_get(key, values)

    self.queries[associated_model].add(new_query)

    add_records(new_records)

    subload(associated_model, new_query, new_records + existing_records)
  end
end