Class: Embulk::Output::Bigquery::GcsClient

Inherits:
GoogleClient show all
Defined in:
lib/embulk/output/bigquery/gcs_client.rb

Instance Method Summary collapse

Methods inherited from GoogleClient

#client, #with_network_retry

Constructor Details

#initialize(task) ⇒ GcsClient

Returns a new instance of GcsClient.



13
14
15
16
17
18
19
20
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 13

def initialize(task)
  scope = "https://www.googleapis.com/auth/cloud-platform"
  client_class = Google::Apis::StorageV1::StorageService
  super(task, scope, client_class)

  @project = @task['project']
  @bucket = @task['gcs_bucket']
end

Instance Method Details

#delete_object(object, bucket: nil) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 89

def delete_object(object, bucket: nil)
  bucket ||= @bucket
  object = object.start_with?('/') ? object[1..-1] : object
  object_uri = URI.join("gs://#{bucket}", object).to_s
  begin
    Embulk.logger.info { "embulk-output-bigquery: Delete object... #{@project}:#{object_uri}" }
    opts = {}

    Embulk.logger.debug { "embulk-output-bigquery: delete_object(#{bucket}, #{object}, #{opts})" }
    response = with_network_retry { client.delete_object(bucket, object, opts) }
  rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
    if e.status_code == 404 # ignore 'notFound' error
      return nil
    end
    response = {status_code: e.status_code, message: e.message, error_class: e.class}
    Embulk.logger.error {
      "embulk-output-bigquery: delete_object(#{bucket}, #{object}, #{opts}), response:#{response}"
    }
    raise Error, "failed to delete object #{@project}:#{object_uri}, response:#{response}"
  end
end

#insert_bucket(bucket = nil) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 22

def insert_bucket(bucket = nil)
  bucket ||= @bucket
  begin
    Embulk.logger.info { "embulk-output-bigquery: Insert bucket... #{@project}:#{bucket}" }
    body  = {
      name: bucket,
    }
    opts = {}

    Embulk.logger.debug { "embulk-output-bigquery: insert_bucket(#{@project}, #{body}, #{opts})" }
    with_network_retry { client.insert_bucket(@project, body, opts) }
  rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
    if e.status_code == 409 && /conflict:/ =~ e.message
      # ignore 'Already Exists' error
      return nil
    end
    response = {status_code: e.status_code, message: e.message, error_class: e.class}
    Embulk.logger.error {
      "embulk-output-bigquery: insert_bucket(#{@project}, #{body}, #{opts}), response:#{response}"
    }
    raise Error, "failed to insert bucket #{@project}:#{bucket}, response:#{response}"
  end
end

#insert_object(path, object: nil, bucket: nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 46

def insert_object(path, object: nil, bucket: nil)
  bucket ||= @bucket
  object ||= path
  object = object.start_with?('/') ? object[1..-1] : object
  object_uri = URI.join("gs://#{bucket}", object).to_s

  started = Time.now
  begin
    Embulk.logger.info { "embulk-output-bigquery: Insert object... #{path} => #{@project}:#{object_uri}" }
    body = {
      name: object,
    }
    opts = {
      upload_source: path,
      content_type: 'application/octet-stream'
    }

    Embulk.logger.debug { "embulk-output-bigquery: insert_object(#{bucket}, #{body}, #{opts})" }
    # memo: gcs is strongly consistent for insert (read-after-write). ref: https://cloud.google.com/storage/docs/consistency
    with_network_retry { client.insert_object(bucket, body, opts) }
  rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
    response = {status_code: e.status_code, message: e.message, error_class: e.class}
    Embulk.logger.error {
      "embulk-output-bigquery: insert_object(#{bucket}, #{body}, #{opts}), response:#{response}"
    }
    raise Error, "failed to insert object #{@project}:#{object_uri}, response:#{response}"
  end
end

#insert_objects(paths, objects: nil, bucket: nil) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/embulk/output/bigquery/gcs_client.rb', line 75

def insert_objects(paths, objects: nil, bucket: nil)
  return [] if paths.empty?
  bucket ||= @bucket
  objects ||= paths
  raise "number of paths and objects are different" if paths.size != objects.size

  responses = []
  paths.each_with_index do |path, idx|
    object = objects[idx]
    responses << insert_object(path, object: object, bucket: bucket)
  end
  responses
end