Class: AtlasEngine::AddressImporter::OpenAddress::GeoJsonImportJob
Constant Summary
collapse
- CHUNK_SIZE =
10_000
- REPORT_STEP =
5
- StringProps =
T.type_alias { T::Hash[String, T.untyped] }
- BatchOfRows =
T.type_alias { T::Array[StringProps] }
- FilterType =
T.type_alias { T.proc.params(arg0: StringProps).returns(T::Boolean) }
- Corrector =
AddressImporter::Corrections::Corrector
PreparesGeoJsonFile::ROOT_FOLDER
ImportLogHelper::TEST_TIMESTAMP
Instance Attribute Summary collapse
Instance Method Summary
collapse
#download_geojson
Methods included from LogHelper
#log_error, #log_info, #log_warn
#log_final_stats
#import_log_error, #import_log_info
#argument
Instance Attribute Details
#country_code ⇒ Object
Returns the value of attribute country_code.
27
28
29
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27
def country_code
@country_code
end
|
#country_import ⇒ Object
Returns the value of attribute country_import.
27
28
29
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27
def country_import
@country_import
end
|
#geojson_path ⇒ Object
Returns the value of attribute geojson_path.
27
28
29
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27
def geojson_path
@geojson_path
end
|
#loader ⇒ Object
Returns the value of attribute loader.
27
28
29
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27
def loader
@loader
end
|
Returns the value of attribute transformer.
27
28
29
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27
def transformer
@transformer
end
|
Instance Method Details
#attributes_from_batch(batch) ⇒ Object
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 124
def attributes_from_batch(batch)
batch
.filter_map do |feature|
attrs = transformer.transform(feature)
if attrs.nil?
incr_invalid_lines
next
end
attrs
end
end
|
#build_enumerator(params, cursor:) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 65
def build_enumerator(params, cursor:)
start_at = if cursor.nil?
import_log_info(country_import: country_import, message: "Importing whole file")
0
else
import_log_info(country_import: country_import, message: "Starting import at chunk #{cursor}")
cursor.to_i
end
io.each
.each_slice(CHUNK_SIZE)
.lazy
.drop(start_at) .with_index(start_at) .map do |lines, chunk_num|
track_progress(chunk_num)
[lines.map { JSON.parse(_1) }, chunk_num]
end
.map do |features, chunk_num|
[features.select(&row_filter), chunk_num]
end
end
|
#condense_addresses(addresses) ⇒ Object
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 107
def condense_addresses(addresses)
addresses
.group_by { |attrs| [attrs[:province_code], attrs[:locale], attrs[:city], attrs[:street], attrs[:zip]] }
.map do |(_province_code, _locale, _city, _street, _zip), matched_addresses|
matched_addresses.reduce do |acc, matched_address|
acc.merge(matched_address) do |key, oldval, newval|
if key == :building_and_unit_ranges
oldval.merge(newval)
else
newval
end
end
end
end
end
|
#corrector ⇒ Object
178
179
180
181
182
183
184
185
186
187
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 178
def corrector
@corrector ||= if country_profile.ingestion.correctors(source: "open_address").empty?
nil
else
Corrector.new(
country_code: country_code,
source: "open_address",
)
end
end
|
#country_profile ⇒ Object
157
158
159
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 157
def country_profile
@country_profile ||= CountryProfile.for(country_code)
end
|
#each_iteration(batch, element_id) ⇒ Object
92
93
94
95
96
97
98
99
100
101
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 92
def each_iteration(batch, element_id)
exit_if_interrupted!(country_import)
addresses = attributes_from_batch(batch)
return if addresses.blank?
condensed = condense_addresses(addresses)
loader.load(condensed)
end
|
#incr_invalid_lines ⇒ Object
197
198
199
200
201
202
203
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 197
def incr_invalid_lines
if @invalid_lines.nil?
@invalid_lines = 0
else
@invalid_lines += 1
end
end
|
#invalid_lines ⇒ Object
206
207
208
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 206
def invalid_lines
@invalid_lines || 0
end
|
#io ⇒ Object
192
193
194
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 192
def io
Zlib::GzipReader.new(geojson_path.open("rb"))
end
|
#row_filter ⇒ Object
164
165
166
167
168
169
170
171
172
173
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 164
def row_filter
@row_filter ||= case country_profile.open_address[:filter]
in nil ->(_row) { true }
in /\w+(::\w+)+/ => sym cls = sym.constantize
inst = cls.new(country_import: country_import)
inst.method(:filter).to_proc
end
end
|
#setup_and_download(&block) ⇒ Object
Setup boilerplate: JobIteration doesn’t let us override #perform. Instead the around_perform callback is used for that.
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 36
def setup_and_download(&block)
@loader = Loader.new
@country_code = argument(:country_code)
@geojson_path = Pathname.new(argument(:geojson_file_path))
@locale = argument(:locale)&.downcase
@country_import = CountryImport.find(argument(:country_import_id))
@transformer = Transformer.new(country_import: country_import, locale: @locale)
import_log_info(
country_import: country_import,
message: "Downloading geojson file",
additional_params: { file_path: geojson_path.to_s },
)
download_geojson(&block)
end
|
#track_progress(chunk_num) ⇒ Object
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 138
def track_progress(chunk_num)
return unless chunk_num % REPORT_STEP == 0
lines_parsed = chunk_num * CHUNK_SIZE
import_log_info(
country_import: country_import,
message: "Processing chunk #{chunk_num}, lines parsed so far: #{lines_parsed}",
)
if lines_parsed != invalid_lines
import_log_info(
country_import: country_import,
message: "Lines discarded: #{invalid_lines}",
category: :invalid_address,
)
end
end
|