Class: HealthDataStandards::Import::Bundle::Importer

Inherits:
Object
  • Object
show all
Defined in:
lib/health-data-standards/import/bundle/importer.rb

Constant Summary collapse

SOURCE_ROOTS =
{bundle: 'bundle.json',
libraries: File.join('library_functions','*.js'),
measures: 'measures', results: 'results',
valuesets: File.join('value_sets','json','*.json'),
patients: 'patients'}
COLLECTION_NAMES =
["bundles", "records", "measures", "selected_measures", "patient_cache", "query_cache", "system.js"]
CLEAR_ONLY_COLLECTIONS =
["system.js"]
DEFAULTS =
{type: nil,
  delete_existing: false,
  update_measures: true,
  clear_collections: COLLECTION_NAMES
}

Class Method Summary collapse

Class Method Details

.compare_and_update_entries(patient, reference_id, start_date, end_date, codes) ⇒ Object

bit of a hack here, equality is made by date and codes



172
173
174
175
176
177
178
179
180
181
# File 'lib/health-data-standards/import/bundle/importer.rb', line 172

def self.compare_and_update_entries(patient, reference_id, start_date, end_date, codes)
  patient.entries.each do |entry|
    # if dates and codes match then replace id with original
    if compare_dates(entry, start_date, end_date)
      if entry.codes == codes
        entry._id = BSON::ObjectId.from_string(reference_id)
      end
    end
  end
end

.compare_dates(entry, start_date, end_date) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/health-data-standards/import/bundle/importer.rb', line 183

def self.compare_dates(entry, start_date, end_date)
  if entry.start_time * 1000 ==  start_date
    if entry.end_time == nil
      if end_date == nil
        return true
      else
        return false
      end
    else entry.end_time * 1000 == end_date
      return true
    end
  end
  return false
end

.entry_key(original, extension) ⇒ Object

A utility function for finding files in a bundle. Strip a file path of it’s extension and just give the filename.

Parameters:

  • original (String)

    A file path.

  • extension (String)

    A file extension.

Returns:

  • The filename at the end of the original String path with the extension removed. e.g. “/boo/urns.html” -> “urns”



95
96
97
# File 'lib/health-data-standards/import/bundle/importer.rb', line 95

def self.entry_key(original, extension)
  original.split('/').last.gsub(".#{extension}", '')
end

.import(zip, options = {}) ⇒ Object

Import a quality bundle into the database. This includes metadata, measures, test patients, supporting JS libraries, and expected results.

Parameters:

  • zip (File)

    The bundle zip file.

  • Type (String)

    of measures to import, either ‘ep’, ‘eh’ or nil for all

  • keep_existing (Boolean)

    If true, delete all current collections related to patients and measures.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/health-data-standards/import/bundle/importer.rb', line 26

def self.import(zip,  options={})
  options = DEFAULTS.merge(options)

  bundle = nil
  Zip::ZipFile.open(zip.path) do |zip_file|

    bundle = unpack_bundle(zip_file)

    bundle_versions = Hash[* HealthDataStandards::CQM::Bundle.where({}).collect{|b| [b._id, b.version]}.flatten]
    if bundle_versions.invert[bundle.version] && !(options[:delete_existing])
      raise "A bundle with version #{bundle.version} already exists in the database. "
    end

    HealthDataStandards::CQM::Bundle.where({:version => bundle.version}).each do |b|
      puts "deleting existing bundle version: #{b.version}"
      b.delete
    end if options[:delete_existing]

    #find the highest bundle version and see if one is installed that is greater than the one
    # we are currently installing.  Do not load the libs other wise
    vers = bundle_versions.values.sort.reverse[0]
    if (vers.nil? || vers <= bundle.version || options["force_js_install"])
      unpack_and_store_system_js(zip_file)
    else
      puts "javascript libraries will not being updated as a more recent bundle version is already installed"
    end
    # Store the bundle metadata.
    unless bundle.save
      raise bundle.errors.full_messages.join(",")
    end
    puts "bundle metadata unpacked..."

    measure_ids = unpack_and_store_measures(zip_file, options[:type], bundle, options[:update_measures])
    unpack_and_store_patients(zip_file, options[:type], bundle) unless options[:exclude_results]
    unpack_and_store_valuesets(zip_file, bundle)
    unpack_and_store_results(zip_file, options[:type], measure_ids, bundle) unless options[:exclude_results]

  end

  return bundle
ensure
  # If the bundle is nil or the bundle has never been saved then do not set done_importing or run save.
  if bundle && bundle.created_at
    bundle.done_importing = true
    bundle.save
  end
end

.reconnect_references(patient, source_data_with_references, source_data_reference_id_hash, source_data_id_hash) ⇒ Object



198
199
200
201
202
203
204
205
206
207
208
# File 'lib/health-data-standards/import/bundle/importer.rb', line 198

def self.reconnect_references(patient, source_data_with_references, source_data_reference_id_hash, source_data_id_hash)

  source_data_with_references.each do |source_data_with_reference|
    #only do this with the references
    sdc = patient['source_data_criteria'][source_data_with_reference]
    source_data_reference_id_hash[sdc['criteria_id']].each do |ref_criteria_id|
      ref_sdc = patient['source_data_criteria'][source_data_id_hash[ref_criteria_id]]
      compare_and_update_entries(patient, ref_sdc['coded_entry_id'],ref_sdc['start_date'],ref_sdc['end_date'],ref_sdc['codes'])
    end
  end
end

.report_progress(label, percent) ⇒ Object



253
254
255
256
# File 'lib/health-data-standards/import/bundle/importer.rb', line 253

def self.report_progress(label, percent)
  print "\rLoading: #{label} #{percent}% complete"
  STDOUT.flush
end

.save_system_js_fn(name, fn) ⇒ Object

Save a javascript function into Mongo’s system.js collection for measure execution.

Parameters:

  • name (String)

    The name by which the function will be referred.

  • fn (String)

    The body of the function being saved.



79
80
81
82
83
84
85
86
87
88
# File 'lib/health-data-standards/import/bundle/importer.rb', line 79

def self.save_system_js_fn(name, fn)
  fn = "function () {\n #{fn} \n }"
  Mongoid.default_client['system.js'].replace_one({
        "_id" => name},
            {
              "_id" => name,
              "value" => BSON::Code.new(fn)
      },{upsert: true}
  )
end

.unpack_and_store_measures(zip, type, bundle, update_measures) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/health-data-standards/import/bundle/importer.rb', line 111

def self.unpack_and_store_measures(zip, type, bundle, update_measures)
  measure_ids = []
  entries = zip.glob(File.join(SOURCE_ROOTS[:measures],type || '**','*.json'))
  entries.each_with_index do |entry, index|
    source_measure = unpack_json(entry)
    # we clone so that we have a source without a bundle id
    measure = source_measure.clone
    measure_ids << measure['id']
    measure['bundle_id'] = bundle.id
    Mongoid.default_client["measures"].insert_one(measure)

    if update_measures
      Mongoid.default_client["measures"].find({hqmf_id: measure["hqmf_id"], sub_id: measure["sub_id"]}).each do |m|
        b = HealthDataStandards::CQM::Bundle.find(m["bundle_id"])
        if b.version < bundle.version
          m.merge!(source_measure)
          Mongoid.default_client["measures"].update_one({"_id" => m["_id"]},m)
        end
      end
    end
    report_progress('measures', (index*100/entries.length)) if index%10 == 0
  end
  puts "\rLoading: Measures Complete          "
  measure_ids
end

.unpack_and_store_patients(zip, type, bundle) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/health-data-standards/import/bundle/importer.rb', line 137

def self.unpack_and_store_patients(zip, type, bundle)
  entries = zip.glob(File.join(SOURCE_ROOTS[:patients],type || '**','json','*.json'))
  entries.each_with_index do |entry, index|
    patient = Record.new(unpack_json(entry))
    patient['bundle_id'] = bundle.id

    #index
    source_data_with_references = Array.new
    source_data_reference_id_hash = Hash.new
    source_data_id_hash = Hash.new
    index = 0
    #loops through source data criteria, if there are references adds ids to hash
    patient['source_data_criteria'].each do |data_criteria|
      source_data_id_hash[data_criteria['criteria_id']] = index
      if data_criteria['references'] != nil
        source_data_with_references.push(index)
        reference_ids = Array.new
        data_criteria['references'].each do |reference|
          reference_ids.push(reference['reference_id'])
        end
        source_data_reference_id_hash[data_criteria['criteria_id']] = reference_ids
      end
      index = index + 1
    end
    #if there are references, id references are reestablished
    if source_data_with_references.size > 0
      reconnect_references(patient, source_data_with_references, source_data_reference_id_hash, source_data_id_hash)
    end
    patient.save
    report_progress('patients', (index*100/entries.length)) if index%10 == 0
  end
  puts "\rLoading: Patients Complete          "
end

.unpack_and_store_results(zip, type, measure_ids, bundle) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/health-data-standards/import/bundle/importer.rb', line 221

def self.unpack_and_store_results(zip, type, measure_ids, bundle)
  zip.glob(File.join(SOURCE_ROOTS[:results],'*.json')).each do |entry|
    name = Pathname.new(entry.name).basename('.json').to_s
    collection = (name == "by_patient") ? "patient_cache" : "query_cache"

    contents = unpack_json(entry)

    if (type)
      contents.select! {|entry| measure_ids.include? entry['measure_id']} if collection == 'query_cache'
      contents.select! {|entry| measure_ids.include? entry['value']['measure_id']} if collection == 'patient_cache'
    end

    contents.each do |document|
      if name == "by_patient"
        # Set the patient_id to the actual _id of
        # newly created patient record
        medical_record_id = document['value']['medical_record_id']
        if patient = Record.by_patient_id(medical_record_id).first
          document['value']['patient_id'] = patient.id
        end
      end
      document['bundle_id'] = bundle.id
      Mongoid.default_client[collection].insert_one(document)
    end
  end
  puts "\rLoading: Results Complete          "
end

.unpack_and_store_system_js(zip) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/health-data-standards/import/bundle/importer.rb', line 103

def self.unpack_and_store_system_js(zip)
  zip.glob(SOURCE_ROOTS[:libraries]).each do |entry|
    name = Pathname.new(entry.name).basename('.js').to_s
    contents = entry.get_input_stream.read
    save_system_js_fn(name, contents)
  end
end

.unpack_and_store_valuesets(zip, bundle) ⇒ Object



210
211
212
213
214
215
216
217
218
219
# File 'lib/health-data-standards/import/bundle/importer.rb', line 210

def self.unpack_and_store_valuesets(zip, bundle)
  entries = zip.glob(SOURCE_ROOTS[:valuesets])
  entries.each_with_index do |entry, index|
    vs = HealthDataStandards::SVS::ValueSet.new(unpack_json(entry))
    vs['bundle_id'] = bundle.id
    HealthDataStandards::SVS::ValueSet.collection.insert_one(vs.as_document)
    report_progress('Value Sets', (index*100/entries.length)) if index%10 == 0
  end
  puts "\rLoading: Value Sets Complete          "
end

.unpack_bundle(zip) ⇒ Object



99
100
101
# File 'lib/health-data-standards/import/bundle/importer.rb', line 99

def self.unpack_bundle(zip)
  HealthDataStandards::CQM::Bundle.new(JSON.parse(zip.read(SOURCE_ROOTS[:bundle]),max_nesting: 100))
end

.unpack_json(entry) ⇒ Object



249
250
251
# File 'lib/health-data-standards/import/bundle/importer.rb', line 249

def self.unpack_json(entry)
  JSON.parse(entry.get_input_stream.read,:max_nesting => 100)
end