Class: InstDataShipper::Destinations::HostedData
Constant Summary
Concerns::Chunking::DEFAULT_CHUNK_SIZE
Instance Attribute Summary
Attributes inherited from Base
#dumper
Instance Method Summary
collapse
#group_key
Methods inherited from Base
#config, #group_key, #initialize, #user_config
Instance Method Details
#chunk_data(generator, table:, extra: nil) ⇒ Object
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 51
def chunk_data(generator, table:, extra: nil)
warehouse_name = table[:warehouse_name]
super(generator) do |batch, idx|
bits = [warehouse_name, , idx].compact
temp_file = "#{working_dir}/#{bits.join('.')}.tsv.gz"
Zlib::GzipWriter.open(temp_file) do |gz|
batch.each do |row|
row = row.join("\t") if row.is_a?(Array)
gz.puts(row)
end
end
yield temp_file
File.delete(temp_file)
end
end
|
#cleanup_fatal_error ⇒ Object
82
83
84
85
|
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 82
def cleanup_fatal_error
hosted_data_client.delete("api/v1/custom_dumps/#{hd_dump_id}/", reason: 'Failure during extraction or transformation') if hd_dump_id.present?
redis.del(rk(:state))
end
|
#finalize_dump ⇒ Object
77
78
79
80
|
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 77
def finalize_dump
hosted_data_client.put("api/v1/custom_dumps/#{hd_dump_id}/", start_import: true) if hd_dump_id.present?
redis.del(rk(:state))
end
|
#initialize_dump(context) ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 32
def initialize_dump(context)
tags = [
"ids-schema=#{dumper.schema_digest}",
"ids-genre=#{dumper.export_genre}",
]
tags << "ids-app=#{Rails.application.class.name.gsub(/::Application$/, '')}" if defined?(Rails) && Rails.application
tags << "ids-schema-version=#{schema[:version]}" if schema[:version].present?
dump = hosted_data_client.post(
'api/v1/custom_dumps/',
reference_id: tracker.id,
schema: convert_schema,
tags: tags,
).body.with_indifferent_access
redis.hset(rk(:state), :dump_id, dump[:id])
redis.expire(rk(:state), 30.days.to_i)
end
|
#preinitialize_dump(context) ⇒ Object
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 8
def preinitialize_dump(context)
if context[:incremental_since].present?
begin
last_dump = hosted_data_client.get("api/v1/custom_dumps/last", {
status: 'imported',
tags: [
"ids-schema=#{dumper.schema_digest}",
"ids-genre=#{dumper.export_genre}",
],
}).body.with_indifferent_access
if last_dump[:created_at] < context[:incremental_since]
InstDataShipper.logger.info("Last successful HostedData dump is older than incremental_since - bumping back incremental_since")
context[:incremental_since] = last_dump[:created_at]
end
rescue Faraday::ResourceNotFound
InstDataShipper.logger.info("No Last successful HostedData dump of the same schema - not using incremental_since")
context[:incremental_since] = nil
end
end
end
|
#upload_data_chunk(table_def, chunk) ⇒ Object
71
72
73
74
75
|
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 71
def upload_data_chunk(table_def, chunk)
hosted_data_client.put("api/v1/custom_dumps/#{hd_dump_id}/", artifacts: {
table_name(table_def) => [Faraday::UploadIO.new(chunk, 'application/gzip')],
})
end
|