Module: PatriotGCP::Ext::BigQuery

Included in:
Command::LoadToBigQueryCommand
Defined in:
lib/patriot_gcp/ext/bigquery.rb

Defined Under Namespace

Classes: BigQueryException

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(cls) ⇒ Object



9
10
11
# File 'lib/patriot_gcp/ext/bigquery.rb', line 9

def self.included(cls)
  cls.send(:include, Patriot::Util::System)
end

Instance Method Details

#_bq_load(filename, project_id, dataset_id, table_id, auth_client, api_client, schema, options, polling_interval) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/patriot_gcp/ext/bigquery.rb', line 92

def _bq_load(filename,
             project_id,
             dataset_id,
             table_id,
             auth_client,
             api_client,
             schema,
             options,
             polling_interval)

  bq_client = api_client.discovered_api('bigquery', 'v2')
  body = _make_body(project_id, dataset_id, table_id, schema, options)
  media = Google::APIClient::UploadIO.new(filename, "application/octet-stream")

  result = api_client.execute(
    :api_method => bq_client.jobs.insert,
    :parameters => {
      'projectId' => project_id,
      'uploadType' => 'multipart'
    },
    :body_object => body,
    :authorization => auth_client,
    :media => media
  )

  begin
    job_id = JSON.parse(result.response.body)['jobReference']['jobId']
  rescue
    raise BigQueryException, "failed to register job: #{result.response.body}"
  end

  return _poll(bq_client,
               api_client,
               auth_client,
               project_id,
               job_id,
               polling_interval)
end

#_get_api_clientObject



28
29
30
31
32
# File 'lib/patriot_gcp/ext/bigquery.rb', line 28

def _get_api_client()
  Google::APIClient.new(
      :application_name => VERSION::PROJECT_NAME,
      :application_version => VERSION::VERSION)
end

#_get_auth_client(p12_key, key_pass, email) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/patriot_gcp/ext/bigquery.rb', line 15

def _get_auth_client(p12_key, key_pass, email)
  key = Google::APIClient::KeyUtils.load_from_pkcs12(p12_key, key_pass)
  auth_client = Signet::OAuth2::Client.new(
      :token_credential_uri => 'https://accounts.google.com/o/oauth2/token',
      :audience => 'https://accounts.google.com/o/oauth2/token',
      :scope => 'https://www.googleapis.com/auth/bigquery',
      :issuer => email,
      :signing_key => key)
  auth_client.fetch_access_token!
  return auth_client
end

#_make_body(project_id, dataset_id, table_id, schema, options) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/patriot_gcp/ext/bigquery.rb', line 35

def _make_body(project_id, dataset_id, table_id, schema, options)
  body = {
    'configuration' => {
      'load' => {
        'schema' => schema,
        'destinationTable' => {
          'projectId' => project_id,
          'datasetId' => dataset_id,
          'tableId'   => table_id
        }
      }
    }
  }
  if options
    options.each{|key, value|
      body['configuration']['load'][key] = value
    }
  end

  return body
end

#_poll(bq_client, api_client, auth_client, project_id, job_id, polling_interval) ⇒ Object

Raises:



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/patriot_gcp/ext/bigquery.rb', line 58

def _poll(bq_client,
          api_client,
          auth_client,
          project_id,
          job_id,
          polling_interval)

  polling_interval.times{
    response = JSON.parse(api_client.execute(
                          :api_method => bq_client.jobs.get,
                          :parameters => {
                              'jobId' => job_id,
                              'projectId' => project_id
                          },
                          :headers => {'Content-Type' => 'application/json; charset=UTF-8'},
                          :authorization => auth_client
                      ).response.body)
    state = response["status"]["state"]

    if state == 'DONE'
      if response["status"]["errors"]
        raise BigQueryException, "upload failed: #{response['status']['errors']}"
      else
        return response["statistics"]
      end
    end

    sleep 60
  }

  raise BigQueryException,"registered job didn't finish within: #{polling_interval} mins. please check if it will finish later on. jobId: #{job_id}"
end

#bq_load(filename, p12_key, key_pass, email, project_id, dataset_id, table_id, schema, options = nil, polling_interval = nil) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/patriot_gcp/ext/bigquery.rb', line 132

def bq_load(filename,
            p12_key,
            key_pass,
            email,
            project_id,
            dataset_id,
            table_id,
            schema,
            options=nil,
            polling_interval=nil)

  options ||= {}
  polling_interval ||= 60

  api_client  = _get_api_client()
  auth_client = _get_auth_client(p12_key, key_pass, email)

  return _bq_load(filename,
                  project_id,
                  dataset_id,
                  table_id,
                  auth_client,
                  api_client,
                  schema,
                  options,
                  polling_interval)
end