Class: Magellan::Gcs::Proxy::Context
Constant Summary
Constants included
from Log
Log::CLOUD_LOGGING_RESOURCE_KEYS
Instance Attribute Summary collapse
Instance Method Summary
collapse
#build_notifier, #notifier, #notify, #process_with_notification
Methods included from Log
build_cloud_logging_logger, build_logger, build_loggers, logger, loggers, verbose
Constructor Details
#initialize(message) ⇒ Context
Returns a new instance of Context.
16
17
18
19
20
|
# File 'lib/magellan/gcs/proxy/context.rb', line 16
def initialize(message)
@message = message
@remote_download_files = parse_json(message.attributes['download_files'])
@workspace = nil
end
|
Instance Attribute Details
#message ⇒ Object
Returns the value of attribute message.
15
16
17
|
# File 'lib/magellan/gcs/proxy/context.rb', line 15
def message
@message
end
|
#remote_download_files ⇒ Object
Returns the value of attribute remote_download_files.
15
16
17
|
# File 'lib/magellan/gcs/proxy/context.rb', line 15
def remote_download_files
@remote_download_files
end
|
#workspace ⇒ Object
Returns the value of attribute workspace.
15
16
17
|
# File 'lib/magellan/gcs/proxy/context.rb', line 15
def workspace
@workspace
end
|
Instance Method Details
#build_local_files_obj(obj, mapping) ⇒ Object
120
121
122
123
124
125
126
127
|
# File 'lib/magellan/gcs/proxy/context.rb', line 120
def build_local_files_obj(obj, mapping)
case obj
when Hash then obj.each_with_object({}) { |(k, v), d| d[k] = build_local_files_obj(v, mapping) }
when Array then obj.map { |i| build_local_files_obj(i, mapping) }
when String then mapping[obj]
else obj
end
end
|
#build_mapping(base_dir, obj) ⇒ Object
93
94
95
96
97
98
|
# File 'lib/magellan/gcs/proxy/context.rb', line 93
def build_mapping(base_dir, obj)
flatten_values(obj).flatten.each_with_object({}) do |url, d|
uri = parse_uri(url)
d[url] = File.join(base_dir, uri.host, uri.path)
end
end
|
#directory?(path) ⇒ Boolean
85
86
87
|
# File 'lib/magellan/gcs/proxy/context.rb', line 85
def directory?(path)
File.directory?(path)
end
|
#download ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
64
65
|
# File 'lib/magellan/gcs/proxy/context.rb', line 53
def download
download_mapping.each do |url, path|
FileUtils.mkdir_p File.dirname(path)
logger.debug("Downloading: #{url} to #{path}")
next if Proxy.config[:dryrun]
uri = parse_uri(url)
@last_bucket_name = uri.host
bucket = GCP.storage.bucket(@last_bucket_name)
file = bucket.file uri.path.sub(%r{\A/}, '')
file.download(path)
logger.info("Download OK: #{url} to #{path}")
end
end
|
#download_mapping ⇒ Object
40
41
42
|
# File 'lib/magellan/gcs/proxy/context.rb', line 40
def download_mapping
@download_mapping ||= build_mapping(downloads_dir, remote_download_files)
end
|
#downloads_dir ⇒ Object
36
37
38
|
# File 'lib/magellan/gcs/proxy/context.rb', line 36
def downloads_dir
File.join(workspace, 'downloads')
end
|
#flatten_values(obj) ⇒ Object
100
101
102
103
104
105
106
107
|
# File 'lib/magellan/gcs/proxy/context.rb', line 100
def flatten_values(obj)
case obj
when nil then []
when Hash then flatten_values(obj.values)
when Array then obj.map { |i| flatten_values(i) }
else obj
end
end
|
#local_download_files ⇒ Object
Also known as:
download_files
44
45
46
|
# File 'lib/magellan/gcs/proxy/context.rb', line 44
def local_download_files
@local_download_files ||= build_local_files_obj(remote_download_files, download_mapping)
end
|
#ltsv(hash) ⇒ Object
32
33
34
|
# File 'lib/magellan/gcs/proxy/context.rb', line 32
def ltsv(hash)
hash.map { |k, v| "#{k}:#{v}" }.join("\t")
end
|
#parse_json(str) ⇒ Object
109
110
111
112
|
# File 'lib/magellan/gcs/proxy/context.rb', line 109
def parse_json(str)
return nil if str.nil? || str.empty?
JSON.parse(str)
end
|
#parse_uri(str) ⇒ Object
114
115
116
117
118
|
# File 'lib/magellan/gcs/proxy/context.rb', line 114
def parse_uri(str)
uri = URI.parse(str)
raise "Unsupported scheme #{uri.scheme.inspect} of #{str}" unless uri.scheme == 'gs'
uri
end
|
#setup ⇒ Object
22
23
24
25
26
27
28
29
30
|
# File 'lib/magellan/gcs/proxy/context.rb', line 22
def setup
Dir.mktmpdir 'workspace' do |dir|
@workspace = dir
setup_dirs
PubsubSustainer.run(message) do
yield
end
end
end
|
#setup_dirs ⇒ Object
89
90
91
|
# File 'lib/magellan/gcs/proxy/context.rb', line 89
def setup_dirs
[:downloads_dir, :uploads_dir].each { |k| FileUtils.mkdir_p(send(k)) }
end
|
#upload ⇒ Object
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/magellan/gcs/proxy/context.rb', line 67
def upload
Dir.chdir(uploads_dir) do
Dir.glob('*') do |bucket_name|
Dir.chdir(bucket_name) do
Dir.glob('**/*') do |path|
next if directory?(path)
url = "gs://#{bucket_name}/#{path}"
logger.info("Uploading: #{path} to #{url}")
next if Proxy.config[:dryrun]
bucket = GCP.storage.bucket(bucket_name)
bucket.create_file path, path
logger.info("Upload OK: #{path} to #{url}")
end
end
end
end
end
|
#uploads_dir ⇒ Object
49
50
51
|
# File 'lib/magellan/gcs/proxy/context.rb', line 49
def uploads_dir
File.join(workspace, 'uploads')
end
|