Class: SalesforceBulkQuery::Query

Inherits:
Object
  • Object
show all
Defined in:
lib/salesforce_bulk_query/query.rb

Overview

Abstraction of a single user-given query. It contains multiple jobs, is tied to a specific connection

Constant Summary collapse

OFFSET_FROM_NOW =

if no created_to is given we use the current time with this offset subtracted (to make sure the freshest changes that can be inconsistent aren’t there) It’s in minutes

10
DEFAULT_MIN_CREATED =
"1999-01-01T00:00:00.000Z"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sobject, soql, connection, options = {}) ⇒ Query

Returns a new instance of Query.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/salesforce_bulk_query/query.rb', line 14

def initialize(sobject, soql, connection, options={})
  @sobject = sobject
  @soql = soql
  @connection = connection
  @logger = options[:logger]
  @created_from = options[:created_from]
  @created_to = options[:created_to]
  @single_batch = options[:single_batch]

  # jobs currently running
  @jobs_in_progress = []

  # successfully finished jobs with no batches to split
  @jobs_done = []

  # finished or timeouted jobs with some batches split into other jobs
  @jobs_restarted = []

  @finished_batch_filenames = []
  @restarted_subqueries = []
end

Instance Attribute Details

#jobs_doneObject (readonly)

Returns the value of attribute jobs_done.



36
37
38
# File 'lib/salesforce_bulk_query/query.rb', line 36

def jobs_done
  @jobs_done
end

#jobs_in_progressObject (readonly)

Returns the value of attribute jobs_in_progress.



36
37
38
# File 'lib/salesforce_bulk_query/query.rb', line 36

def jobs_in_progress
  @jobs_in_progress
end

#jobs_restartedObject (readonly)

Returns the value of attribute jobs_restarted.



36
37
38
# File 'lib/salesforce_bulk_query/query.rb', line 36

def jobs_restarted
  @jobs_restarted
end

Instance Method Details

#get_available_results(options = {}) ⇒ Object

Get results for all finished jobs. If there are some unfinished batches, skip them and return them as unfinished.

Parameters:

  • options (:directory_path) (defaults to: {})


79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/salesforce_bulk_query/query.rb', line 79

def get_available_results(options={})

  unfinished_subqueries = []
  jobs_in_progress = []
  jobs_restarted = []
  jobs_done = []

  # check all jobs statuses and split what should be split
  @jobs_in_progress.each do |job|

    # download what's available
    job_results = job.get_available_results(options)

    job_over_limit = job.over_limit?
    job_done = job_results[:finished] || job_over_limit

    @logger.debug "job_results: #{job_results}" if @logger

    unfinished_batches = job_results[:unfinished_batches]
    verification_fail_batches = job_results[:verification_fail_batches]

    unfinished_subqueries += unfinished_batches.map {|b| b.soql}

    # split to subqueries what needs to be split
    to_split = verification_fail_batches
    to_split += unfinished_batches if job_over_limit

    # delete files associated with batches that failed verification
    verification_fail_batches.each do |b|
      @logger.info "Deleting #{b.filename}, verification failed."
      File.delete(b.filename)
    end

    to_split.each do |batch|
      # for each unfinished batch create a new job and add it to new jobs
      @logger.info "The following subquery didn't end in time / failed verification: #{batch.soql}. Dividing into multiple and running again" if @logger
      new_job = SalesforceBulkQuery::Job.new(@sobject, @connection, {:logger => @logger}.merge(options))
      new_job.create_job
      new_job.generate_batches(@soql, batch.start, batch.stop)
      new_job.close_job
      jobs_in_progress.push(new_job)
    end

    # what to do with the current job
    # finish, some stuff restarted
    if job_done
      if to_split.empty?
        # done, nothing left
        jobs_done.push(job)

        @logger.info "#{job.job_id} finished. Nothing to split. unfinished_batches: #{unfinished_batches}, verification_fail_batches: #{verification_fail_batches}" if @logger
      else
        # done, some batches needed to be restarted
        jobs_restarted.push(job)
      end

      # store the filenames and restarted stuff
      @finished_batch_filenames += job_results[:filenames]
      @restarted_subqueries += to_split.map {|b| b.soql}
    else
      # still in progress
      jobs_in_progress.push(job)
    end
  end

  # remove the finished jobs from progress and add there the new ones
  @jobs_in_progress = jobs_in_progress
  @jobs_done += jobs_done

  # we're done if there're no jobs in progress
  return {
    :succeeded => @jobs_in_progress.empty?,
    :filenames => @finished_batch_filenames,
    :unfinished_subqueries => unfinished_subqueries,
    :jobs_done => @jobs_done.map { |j| j.job_id }
  }
end

#start(options = {}) ⇒ Object

Creates the first job, divides the query to subqueries, puts all the subqueries as batches to the job



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/salesforce_bulk_query/query.rb', line 41

def start(options={})
  # order by and where not allowed
  if (!@single_batch) && (@soql =~ /WHERE/i || @soql =~ /ORDER BY/i)
    raise "You can't have WHERE or ORDER BY in your soql. If you want to download just specific date range use created_from / created_to"
  end

  # create the first job
  job = SalesforceBulkQuery::Job.new(@sobject, @connection, {:logger => @logger}.merge(options))
  job.create_job

  # get the date when it should start
  if @created_from
    min_created = @created_from
  else
    # get the date when the first was created
    min_created = nil
    begin
      min_created_resp = @connection.client.query("SELECT CreatedDate FROM #{@sobject} ORDER BY CreatedDate LIMIT 1")
      min_created_resp.each {|s| min_created = s[:CreatedDate]}
    rescue Faraday::Error::TimeoutError => e
      @logger.warn "Timeout getting the oldest object for #{@sobject}. Error: #{e}. Using the default value" if @logger
      min_created = DEFAULT_MIN_CREATED
    end
  end

  # generate intervals
  start = DateTime.parse(min_created)
  stop = @created_to ? DateTime.parse(@created_to) : DateTime.now - Rational(options[:offset_from_now] || OFFSET_FROM_NOW, 1440)
  job.generate_batches(@soql, start, stop, @single_batch)

  job.close_job

  @jobs_in_progress.push(job)
end