Class: Embulk::Output::Bigquery::GcsClient
- Inherits:
-
GoogleClient
- Object
- GoogleClient
- Embulk::Output::Bigquery::GcsClient
- Defined in:
- lib/embulk/output/bigquery/gcs_client.rb
Instance Method Summary collapse
- #delete_object(object, bucket: nil) ⇒ Object
-
#initialize(task) ⇒ GcsClient
constructor
A new instance of GcsClient.
- #insert_bucket(bucket = nil) ⇒ Object
- #insert_object(path, object: nil, bucket: nil) ⇒ Object
- #insert_objects(paths, objects: nil, bucket: nil) ⇒ Object
Methods inherited from GoogleClient
Constructor Details
#initialize(task) ⇒ GcsClient
Returns a new instance of GcsClient.
13 14 15 16 17 18 19 20 |
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 13 def initialize(task) scope = "https://www.googleapis.com/auth/cloud-platform" client_class = Google::Apis::StorageV1::StorageService super(task, scope, client_class) @project = @task['project'] @bucket = @task['gcs_bucket'] end |
Instance Method Details
#delete_object(object, bucket: nil) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 89 def delete_object(object, bucket: nil) bucket ||= @bucket object = object.start_with?('/') ? object[1..-1] : object object_uri = URI.join("gs://#{bucket}", object).to_s begin Embulk.logger.info { "embulk-output-bigquery: Delete object... #{@project}:#{object_uri}" } opts = {} Embulk.logger.debug { "embulk-output-bigquery: delete_object(#{bucket}, #{object}, #{opts})" } response = with_network_retry { client.delete_object(bucket, object, opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 # ignore 'notFound' error return nil end response = {status_code: e.status_code, message: e., error_class: e.class} Embulk.logger.error { "embulk-output-bigquery: delete_object(#{bucket}, #{object}, #{opts}), response:#{response}" } raise Error, "failed to delete object #{@project}:#{object_uri}, response:#{response}" end end |
#insert_bucket(bucket = nil) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 22 def insert_bucket(bucket = nil) bucket ||= @bucket begin Embulk.logger.info { "embulk-output-bigquery: Insert bucket... #{@project}:#{bucket}" } body = { name: bucket, } opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_bucket(#{@project}, #{body}, #{opts})" } with_network_retry { client.insert_bucket(@project, body, opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /conflict:/ =~ e. # ignore 'Already Exists' error return nil end response = {status_code: e.status_code, message: e., error_class: e.class} Embulk.logger.error { "embulk-output-bigquery: insert_bucket(#{@project}, #{body}, #{opts}), response:#{response}" } raise Error, "failed to insert bucket #{@project}:#{bucket}, response:#{response}" end end |
#insert_object(path, object: nil, bucket: nil) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 46 def insert_object(path, object: nil, bucket: nil) bucket ||= @bucket object ||= path object = object.start_with?('/') ? object[1..-1] : object object_uri = URI.join("gs://#{bucket}", object).to_s started = Time.now begin Embulk.logger.info { "embulk-output-bigquery: Insert object... #{path} => #{@project}:#{object_uri}" } body = { name: object, } opts = { upload_source: path, content_type: 'application/octet-stream' } Embulk.logger.debug { "embulk-output-bigquery: insert_object(#{bucket}, #{body}, #{opts})" } # memo: gcs is strongly consistent for insert (read-after-write). ref: https://cloud.google.com/storage/docs/consistency with_network_retry { client.insert_object(bucket, body, opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e response = {status_code: e.status_code, message: e., error_class: e.class} Embulk.logger.error { "embulk-output-bigquery: insert_object(#{bucket}, #{body}, #{opts}), response:#{response}" } raise Error, "failed to insert object #{@project}:#{object_uri}, response:#{response}" end end |
#insert_objects(paths, objects: nil, bucket: nil) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 75 def insert_objects(paths, objects: nil, bucket: nil) return [] if paths.empty? bucket ||= @bucket objects ||= paths raise "number of paths and objects are different" if paths.size != objects.size responses = [] paths.each_with_index do |path, idx| object = objects[idx] responses << insert_object(path, object: object, bucket: bucket) end responses end |