Class: SalesforceBulkQuery::Job
- Inherits:
-
Object
- Object
- SalesforceBulkQuery::Job
- Defined in:
- lib/salesforce_bulk_query/job.rb
Overview
Represents a Salesforce bulk api job, contains multiple batches. Many jobs contained in Query
Constant Summary collapse
- JOB_TIME_LIMIT =
15 * 60
- BATCH_COUNT =
15
- @@operation =
'query'
- @@xml_header =
'<?xml version="1.0" encoding="utf-8" ?>'
Instance Attribute Summary collapse
-
#job_id ⇒ Object
readonly
Returns the value of attribute job_id.
Instance Method Summary collapse
- #add_query(query, options = {}) ⇒ Object
- #check_status ⇒ Object
- #close_job ⇒ Object
-
#create_job(csv = true) ⇒ Object
Do the API request.
- #generate_batches(soql, start, stop, single_batch = false) ⇒ Object
-
#get_available_results(options = {}) ⇒ Object
downloads whatever is available, returns as unfinished whatever is not.
- #get_extended_soql(soql, from, to) ⇒ Object
-
#initialize(sobject, connection, options = {}) ⇒ Job
constructor
A new instance of Job.
- #over_limit? ⇒ Boolean
- #to_log ⇒ Object
Constructor Details
#initialize(sobject, connection, options = {}) ⇒ Job
Returns a new instance of Job.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/salesforce_bulk_query/job.rb', line 14 def initialize(sobject, connection, ={}) @sobject = sobject @connection = connection @logger = [:logger] @job_time_limit = [:job_time_limit] || JOB_TIME_LIMIT @date_field = [:date_field] or fail "date_field must be given when creating a batch" # all batches (static) @batches = [] # unfinished batches as of last get_available_results call @unfinished_batches = [] # filenames fort the already downloaded and verified batches @filenames = [] end |
Instance Attribute Details
#job_id ⇒ Object (readonly)
Returns the value of attribute job_id.
31 32 33 |
# File 'lib/salesforce_bulk_query/job.rb', line 31 def job_id @job_id end |
Instance Method Details
#add_query(query, options = {}) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/salesforce_bulk_query/job.rb', line 83 def add_query(query, ={}) # create and create a batch batch = SalesforceBulkQuery::Batch.new( :sobject => @sobject, :soql => query, :job_id => @job_id, :connection => @connection, :start => [:start], :stop => [:stop], :logger => @logger, :date_field => @date_field ) batch.create # add the batch to the list @batches.push(batch) @unfinished_batches.push(batch) end |
#check_status ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/salesforce_bulk_query/job.rb', line 113 def check_status path = "job/#{@job_id}" response_parsed = @connection.get_xml(path) @completed_count = Integer(response_parsed["numberBatchesCompleted"][0]) @succeeded = @completed_count == Integer(response_parsed["numberBatchesTotal"][0]) return { :succeeded => @succeeded, :some_records_failed => Integer(response_parsed["numberRecordsFailed"][0]) > 0, :some_batches_failed => Integer(response_parsed["numberBatchesFailed"][0]) > 0, :response => response_parsed } end |
#close_job ⇒ Object
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/salesforce_bulk_query/job.rb', line 102 def close_job xml = "#{@@xml_header}<jobInfo xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">" xml += "<state>Closed</state>" xml += "</jobInfo>" path = "job/#{@job_id}" response_parsed = @connection.post_xml(path, xml) @job_closed_time = Time.now end |
#create_job(csv = true) ⇒ Object
Do the API request
34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/salesforce_bulk_query/job.rb', line 34 def create_job(csv=true) content_type = csv ? "CSV" : "XML" xml = "#{@@xml_header}<jobInfo xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">" xml += "<operation>#{@@operation}</operation>" xml += "<object>#{@sobject}</object>" xml += "<contentType>#{content_type}</contentType>" xml += "</jobInfo>" response_parsed = @connection.post_xml("job", xml) @job_id = response_parsed['id'][0] end |
#generate_batches(soql, start, stop, single_batch = false) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/salesforce_bulk_query/job.rb', line 50 def generate_batches(soql, start, stop, single_batch=false) # if there's just one batch wanted, add it and we're done if single_batch soql_extended = get_extended_soql(soql, start, stop) @logger.info "Adding soql #{soql_extended} as a batch to job" if @logger add_query(soql_extended, :start => start, :stop => stop ) return end # if there's more, generate the time intervals and generate the batches step_size = (stop - start) / BATCH_COUNT interval_beginings = start.step(stop - step_size, step_size).map{|f|f} interval_ends = interval_beginings.clone interval_ends.shift interval_ends.push(stop) interval_beginings.zip(interval_ends).each do |from, to| soql_extended = get_extended_soql(soql, from, to) @logger.info "Adding soql #{soql_extended} as a batch to job" if @logger add_query(soql_extended, :start => from, :stop => to ) end end |
#get_available_results(options = {}) ⇒ Object
downloads whatever is available, returns as unfinished whatever is not
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/salesforce_bulk_query/job.rb', line 133 def get_available_results(={}) downloaded_filenames = [] unfinished_batches = [] verification_fail_batches = [] failed_batches = [] # get result for each batch in the job @unfinished_batches.each do |batch| batch_status = batch.check_status # if the result is ready if batch_status[:succeeded] # each finished batch should go here only once # download the result result = batch.get_result() @logger.info "get_result result: #{result}" if @logger # if the verification failed, put it to failed # will never ask about this one again. if result[:verification] == false verification_fail_batches << batch else # if verification ok and finished put it to filenames downloaded_filenames << result[:filename] end elsif batch_status[:failed] # put it to failed and raise error at the end failed_batches << batch else # otherwise put it to unfinished unfinished_batches << batch end end unless failed_batches.empty? details = failed_batches.map{ |b| "#{b.batch_id}: #{b.}"}.join("\n") fail ArgumentError, "#{failed_batches.length} batches failed. Details: #{details}" end # cache the unfinished_batches till the next run @unfinished_batches = unfinished_batches # cumulate filenames @filenames += downloaded_filenames @logger.info "unfinished batches: #{unfinished_batches}\nverification_fail_batches: #{verification_fail_batches}" if @logger return { :finished => @unfinished_batches.empty?, :filenames => @filenames, :unfinished_batches => @unfinished_batches, :verification_fail_batches => verification_fail_batches } end |
#get_extended_soql(soql, from, to) ⇒ Object
46 47 48 |
# File 'lib/salesforce_bulk_query/job.rb', line 46 def get_extended_soql(soql, from, to) return "#{soql} WHERE #{@date_field} >= #{from} AND #{@date_field} < #{to}" end |
#over_limit? ⇒ Boolean
127 128 129 |
# File 'lib/salesforce_bulk_query/job.rb', line 127 def over_limit? (Time.now - @job_closed_time) > @job_time_limit end |
#to_log ⇒ Object
189 190 191 192 193 194 195 196 |
# File 'lib/salesforce_bulk_query/job.rb', line 189 def to_log return { :sobject => @sobject, :connection => @connection.to_log, :batches => @batches.map {|b| b.to_log}, :unfinished_batches => @unfinished_batches.map {|b| b.to_log} } end |