Class: SalesforceBulkQuery::Query

Inherits:
Object
  • Object
show all
Defined in:
lib/salesforce_bulk_query/query.rb

Overview

Abstraction of a single user-given query. It contains multiple jobs, is tied to a specific connection

Constant Summary collapse

OFFSET_FROM_NOW =

if no date_to is given we use the current time with this offset subtracted (to make sure the freshest changes that can be inconsistent aren’t there) It’s in minutes

10
DEFAULT_DATE_FIELD =
'CreatedDate'
DEFAULT_MIN_CREATED =
"1999-01-01T00:00:00.000Z"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sobject, soql, connection, options = {}) ⇒ Query

Returns a new instance of Query.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/salesforce_bulk_query/query.rb', line 16

def initialize(sobject, soql, connection, options={})
  @sobject = sobject
  @soql = soql
  @connection = connection
  @logger = options[:logger]
  @date_field = options[:date_field] || DEFAULT_DATE_FIELD
  @date_from = options[:date_from]
  @date_to = options[:date_to]
  @single_batch = options[:single_batch]

  # jobs currently running
  @jobs_in_progress = []

  # successfully finished jobs with no batches to split
  @jobs_done = []

  # finished or timeouted jobs with some batches split into other jobs
  @jobs_restarted = []

  @finished_batch_filenames = []
  @restarted_subqueries = []
end

Instance Attribute Details

#jobs_doneObject (readonly)

Returns the value of attribute jobs_done.



39
40
41
# File 'lib/salesforce_bulk_query/query.rb', line 39

def jobs_done
  @jobs_done
end

#jobs_in_progressObject (readonly)

Returns the value of attribute jobs_in_progress.



39
40
41
# File 'lib/salesforce_bulk_query/query.rb', line 39

def jobs_in_progress
  @jobs_in_progress
end

#jobs_restartedObject (readonly)

Returns the value of attribute jobs_restarted.



39
40
41
# File 'lib/salesforce_bulk_query/query.rb', line 39

def jobs_restarted
  @jobs_restarted
end

Instance Method Details

#get_available_results(options = {}) ⇒ Object

Get results for all finished jobs. If there are some unfinished batches, skip them and return them as unfinished.

Parameters:

  • options (:directory_path) (defaults to: {})


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/salesforce_bulk_query/query.rb', line 74

def get_available_results(options={})

  unfinished_subqueries = []
  jobs_in_progress = []
  jobs_restarted = []
  jobs_done = []

  # check all jobs statuses and split what should be split
  @jobs_in_progress.each do |job|

    # download what's available
    job_results = job.get_available_results(options)

    job_over_limit = job.over_limit?
    job_done = job_results[:finished] || job_over_limit

    @logger.debug "job_results: #{job_results}" if @logger

    unfinished_batches = job_results[:unfinished_batches]
    verification_fail_batches = job_results[:verification_fail_batches]

    unfinished_subqueries += unfinished_batches.map {|b| b.soql}

    # split to subqueries what needs to be split
    to_split = verification_fail_batches
    to_split += unfinished_batches if job_over_limit

    # delete files associated with batches that failed verification
    verification_fail_batches.each do |b|
      @logger.info "Deleting #{b.filename}, verification failed."
      File.delete(b.filename)
    end

    to_split.each do |batch|
      # for each unfinished batch create a new job and add it to new jobs
      @logger.info "The following subquery didn't end in time / failed verification: #{batch.soql}. Dividing into multiple and running again" if @logger
      new_job = SalesforceBulkQuery::Job.new(
        @sobject,
        @connection,
        {:logger => @logger, :date_field => @date_field}.merge(options)
      )
      new_job.create_job
      new_job.generate_batches(@soql, batch.start, batch.stop)
      new_job.close_job
      jobs_in_progress.push(new_job)
    end

    # what to do with the current job
    # finish, some stuff restarted
    if job_done
      if to_split.empty?
        # done, nothing left
        jobs_done.push(job)

        @logger.info "#{job.job_id} finished. Nothing to split. unfinished_batches: #{unfinished_batches}, verification_fail_batches: #{verification_fail_batches}" if @logger
      else
        # done, some batches needed to be restarted
        jobs_restarted.push(job)
      end

      # store the filenames and restarted stuff
      @finished_batch_filenames += job_results[:filenames]
      @restarted_subqueries += to_split.map {|b| b.soql}
    else
      # still in progress
      jobs_in_progress.push(job)
    end
  end

  # remove the finished jobs from progress and add there the new ones
  @jobs_in_progress = jobs_in_progress
  @jobs_done += jobs_done

  # we're done if there're no jobs in progress
  return {
    :succeeded => @jobs_in_progress.empty?,
    :filenames => @finished_batch_filenames,
    :unfinished_subqueries => unfinished_subqueries,
    :jobs_done => @jobs_done.map { |j| j.job_id }
  }
end

#start(options = {}) ⇒ Object

Creates the first job, divides the query to subqueries, puts all the subqueries as batches to the job



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/salesforce_bulk_query/query.rb', line 44

def start(options={})
  # order by and where not allowed
  if (!@single_batch) && (@soql =~ /WHERE/i || @soql =~ /ORDER BY/i)
    raise "You can't have WHERE or ORDER BY in your soql. If you want to download just specific date range use date_from / date_to"
  end

  # create the first job
  job = SalesforceBulkQuery::Job.new(
    @sobject,
    @connection,
    {:logger => @logger, :date_field => @date_field}.merge(options)
  )
  job.create_job

  # get the date when it should start
  min_date = get_min_date

  # generate intervals
  start = DateTime.parse(min_date)
  stop = @date_to ? DateTime.parse(@date_to) : DateTime.now - Rational(options[:offset_from_now] || OFFSET_FROM_NOW, 1440)
  job.generate_batches(@soql, start, stop, @single_batch)

  job.close_job

  @jobs_in_progress.push(job)
end