Module: TreasureData::API::Job
- Included in:
- TreasureData::API
- Defined in:
- lib/td/client/api/job.rb
Defined Under Namespace
Classes: HTTPServerException, NullInflate
Instance Method Summary collapse
- 
  
    
      #hive_query(q, db = nil, result_url = nil, priority = nil, retry_limit = nil, opts = {})  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Job_id. 
- #job_result(job_id) ⇒ Array
- 
  
    
      #job_result_each(job_id, &block)  ⇒ nil 
    
    
  
  
  
  
  
  
  
  
  
    block is optional and must accept 1 argument. 
- 
  
    
      #job_result_each_with_compr_size(job_id)  ⇒ nil 
    
    
  
  
  
  
  
  
  
  
  
    block is optional and must accept 1 argument. 
- 
  
    
      #job_result_format(job_id, format, io = nil)  ⇒ nil, String 
    
    
  
  
  
  
  
  
  
  
  
    block is optional and must accept 1 parameter. 
- #job_result_raw(job_id, format, io = nil) ⇒ String
- 
  
    
      #job_status(job_id)  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    HTTP status. 
- #kill(job_id) ⇒ String
- #list_jobs(from = 0, to = nil, status = nil, conditions = nil) ⇒ Array
- 
  
    
      #pig_query(q, db = nil, result_url = nil, priority = nil, retry_limit = nil, opts = {})  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Job_id. 
- 
  
    
      #query(q, type = :hive, db = nil, result_url = nil, priority = nil, retry_limit = nil, opts = {})  ⇒ String 
    
    
  
  
  
  
  
  
  
  
  
    Job_id. 
- #show_job(job_id) ⇒ Array
Instance Method Details
#hive_query(q, db = nil, result_url = nil, priority = nil, retry_limit = nil, opts = {}) ⇒ String
Returns job_id.
| 218 219 220 | # File 'lib/td/client/api/job.rb', line 218 def hive_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={}) query(q, :hive, db, result_url, priority, retry_limit, opts) end | 
#job_result(job_id) ⇒ Array
| 120 121 122 123 124 125 126 127 128 129 | # File 'lib/td/client/api/job.rb', line 120 def job_result(job_id) result = [] unpacker = MessagePack::Unpacker.new job_result_download(job_id) do |chunk| unpacker.feed_each(chunk) do |row| result << row end end return result end | 
#job_result_each(job_id, &block) ⇒ nil
block is optional and must accept 1 argument
| 159 160 161 162 163 164 165 166 | # File 'lib/td/client/api/job.rb', line 159 def job_result_each(job_id, &block) upkr = MessagePack::Unpacker.new # default to decompressing the response since format is fixed to 'msgpack' job_result_download(job_id) do |chunk| upkr.feed_each(chunk, &block) end nil end | 
#job_result_each_with_compr_size(job_id) ⇒ nil
block is optional and must accept 1 argument
| 173 174 175 176 177 178 179 180 181 182 | # File 'lib/td/client/api/job.rb', line 173 def job_result_each_with_compr_size(job_id) upkr = MessagePack::Unpacker.new # default to decompressing the response since format is fixed to 'msgpack' job_result_download(job_id) do |chunk, total| upkr.feed_each(chunk) {|unpacked| yield unpacked, total if block_given? } end nil end | 
#job_result_format(job_id, format, io = nil) ⇒ nil, String
block is optional and must accept 1 parameter
| 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | # File 'lib/td/client/api/job.rb', line 138 def job_result_format(job_id, format, io=nil) if io job_result_download(job_id, format) do |chunk, total| io.write chunk yield total if block_given? end nil else body = String.new job_result_download(job_id, format) do |chunk| body << chunk end body end end | 
#job_result_raw(job_id, format, io = nil) ⇒ String
| 187 188 189 190 191 192 193 194 195 196 197 198 | # File 'lib/td/client/api/job.rb', line 187 def job_result_raw(job_id, format, io = nil) body = io ? nil : String.new job_result_download(job_id, format, false) do |chunk, total| if io io.write(chunk) yield total if block_given? else body << chunk end end body end | 
#job_status(job_id) ⇒ String
Returns HTTP status.
| 108 109 110 111 112 113 114 115 116 | # File 'lib/td/client/api/job.rb', line 108 def job_status(job_id) code, body, res = get("/v3/job/status/#{e job_id}") if code != "200" raise_error("Get job status failed", res) end js = checked_json(body, %w[status]) return js['status'] end | 
#kill(job_id) ⇒ String
| 202 203 204 205 206 207 208 209 210 | # File 'lib/td/client/api/job.rb', line 202 def kill(job_id) code, body, res = post("/v3/job/kill/#{e job_id}") if code != "200" raise_error("Kill job failed", res) end js = checked_json(body, %w[]) former_status = js['former_status'] return former_status end | 
#list_jobs(from = 0, to = nil, status = nil, conditions = nil) ⇒ Array
| 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | # File 'lib/td/client/api/job.rb', line 13 def list_jobs(from=0, to=nil, status=nil, conditions=nil) params = {} params['from'] = from.to_s if from params['to'] = to.to_s if to params['status'] = status.to_s if status params.merge!(conditions) if conditions code, body, res = get("/v3/job/list", params) if code != "200" raise_error("List jobs failed", res) end js = checked_json(body, %w[jobs]) result = [] js['jobs'].each {|m| job_id = m['job_id'] type = (m['type'] || '?').to_sym database = m['database'] status = m['status'] query = m['query'] start_at = m['start_at'] end_at = m['end_at'] cpu_time = m['cpu_time'] result_size = m['result_size'] # compressed result size in msgpack.gz format result_url = m['result'] priority = m['priority'] retry_limit = m['retry_limit'] duration = m['duration'] num_records = m['num_records'] result << [job_id, type, status, query, start_at, end_at, cpu_time, result_size, result_url, priority, retry_limit, nil, database, duration, num_records] } return result end | 
#pig_query(q, db = nil, result_url = nil, priority = nil, retry_limit = nil, opts = {}) ⇒ String
Returns job_id.
| 228 229 230 | # File 'lib/td/client/api/job.rb', line 228 def pig_query(q, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={}) query(q, :pig, db, result_url, priority, retry_limit, opts) end | 
#query(q, type = :hive, db = nil, result_url = nil, priority = nil, retry_limit = nil, opts = {}) ⇒ String
Returns job_id.
| 239 240 241 242 243 244 245 246 247 248 249 250 | # File 'lib/td/client/api/job.rb', line 239 def query(q, type=:hive, db=nil, result_url=nil, priority=nil, retry_limit=nil, opts={}) params = {'query' => q}.merge(opts) params['result'] = result_url if result_url params['priority'] = priority if priority params['retry_limit'] = retry_limit if retry_limit code, body, res = post("/v3/job/issue/#{type}/#{e db}", params) if code != "200" raise_error("Query failed", res) end js = checked_json(body, %w[job_id]) return js['job_id'].to_s end | 
#show_job(job_id) ⇒ Array
| 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | # File 'lib/td/client/api/job.rb', line 49 def show_job(job_id) # use v3/job/status instead of v3/job/show to poll finish of a job code, body, res = get("/v3/job/show/#{e job_id}") if code != "200" raise_error("Show job failed", res) end js = checked_json(body, %w[status]) # TODO debug type = (js['type'] || '?').to_sym # TODO database = js['database'] query = js['query'] status = js['status'] debug = js['debug'] url = js['url'] start_at = js['start_at'] end_at = js['end_at'] cpu_time = js['cpu_time'] result_size = js['result_size'] # compressed result size in msgpack.gz format num_records = js['num_records'] duration = js['duration'] result = js['result'] # result target URL hive_result_schema = (js['hive_result_schema'] || '') if hive_result_schema.empty? hive_result_schema = nil else begin hive_result_schema = JSON.parse(hive_result_schema) rescue JSON::ParserError => e # this is a workaround for a Known Limitation in the Pig Engine which does not set a default, auto-generated # column name for anonymous columns (such as the ones that are generated from UDF like COUNT or SUM). # The schema will contain 'nil' for the name of those columns and that breaks the JSON parser since it violates # the JSON syntax standard. if type == :pig and hive_result_schema !~ /[\{\}]/ begin # NOTE: this works because a JSON 2 dimensional array is the same as a Ruby one. # Any change in the format for the hive_result_schema output may cause a syntax error, in which case # this lame attempt at fixing the problem will fail and we will be raising the original JSON exception hive_result_schema = eval(hive_result_schema) rescue SyntaxError => ignored_e raise e end hive_result_schema.each_with_index {|col_schema, idx| if col_schema[0].nil? col_schema[0] = "_col#{idx}" end } else raise e end end end priority = js['priority'] retry_limit = js['retry_limit'] return [type, query, status, url, debug, start_at, end_at, cpu_time, result_size, result, hive_result_schema, priority, retry_limit, nil, database, duration, num_records] end |