Class: SalesforceBulk::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/salesforce_bulk/client.rb

Overview

Interface for operating the Salesforce Bulk REST API

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/salesforce_bulk/client.rb', line 20

def initialize(options={})
  if options.is_a?(String)
    options = YAML.load_file(options)
    options.symbolize_keys!
  end

  options = {:login_host => 'login.salesforce.com', :version => 24.0}.merge(options)

  options.assert_valid_keys(:username, :password, :login_host, :version)

  self.username = options[:username]
  self.password = "#{options[:password]}"
  self. = options[:login_host]
  self.version = options[:version]

  @api_path_prefix = "/services/async/#{version}/"
  @valid_operations = [:delete, :insert, :update, :upsert, :query]
  @valid_concurrency_modes = ['Parallel', 'Serial']
end

Instance Attribute Details

#instance_hostObject

The instance host to use for API calls. Determined from login response.



9
10
11
# File 'lib/salesforce_bulk/client.rb', line 9

def instance_host
  @instance_host
end

#login_hostObject

The host to use for authentication. Defaults to login.salesforce.com.



6
7
8
# File 'lib/salesforce_bulk/client.rb', line 6

def 
  @login_host
end

#passwordObject

The Salesforce password



12
13
14
# File 'lib/salesforce_bulk/client.rb', line 12

def password
  @password
end

#usernameObject

The Salesforce username



15
16
17
# File 'lib/salesforce_bulk/client.rb', line 15

def username
  @username
end

#versionObject

The API version the client is using. Defaults to 24.0.



18
19
20
# File 'lib/salesforce_bulk/client.rb', line 18

def version
  @version
end

Instance Method Details

#abort_job(jobId) ⇒ Object



64
65
66
67
68
69
70
71
72
73
# File 'lib/salesforce_bulk/client.rb', line 64

def abort_job(jobId)
  xml = '<?xml version="1.0" encoding="utf-8"?>'
  xml += '<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">'
  xml += "<state>Aborted</state>"
  xml += "</jobInfo>"

  response = http_post("job/#{jobId}", xml)
  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Job.new_from_xml(data)
end

#add_batch(jobId, data) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/salesforce_bulk/client.rb', line 75

def add_batch(jobId, data)
  body = data

  if data.is_a?(Array)
    raise ArgumentError, "Data set exceeds 10000 record limit by #{data.length - 10000}" if data.length > 10000

    keys = data.first.keys
    body = keys.to_csv

    data.each do |item|
      item_values = keys.map { |key| item[key] }
      body += item_values.to_csv
    end
  end

  # Despite the content for a query operation batch being plain text we
  # still have to specify CSV content type per API docs.
  response = http_post("job/#{jobId}/batch", body, "Content-Type" => "text/csv; charset=UTF-8")
  result = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Batch.new_from_xml(result)
end

#add_job(operation, sobject, options = {}) ⇒ Object

Raises:

  • (ArgumentError)


97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/salesforce_bulk/client.rb', line 97

def add_job(operation, sobject, options={})
  operation = operation.to_s.downcase.to_sym

  raise ArgumentError.new("Invalid operation: #{operation}") unless @valid_operations.include?(operation)

  options.assert_valid_keys(:external_id_field_name, :concurrency_mode)

  if options[:concurrency_mode]
    concurrency_mode = options[:concurrency_mode].capitalize
    raise ArgumentError.new("Invalid concurrency mode: #{concurrency_mode}") unless @valid_concurrency_modes.include?(concurrency_mode)
  end

  xml = '<?xml version="1.0" encoding="utf-8"?>'
  xml += '<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">'
  xml += "<operation>#{operation}</operation>"
  xml += "<object>#{sobject}</object>"
  xml += "<externalIdFieldName>#{options[:external_id_field_name]}</externalIdFieldName>" if options[:external_id_field_name]
  xml += "<concurrencyMode>#{options[:concurrency_mode]}</concurrencyMode>" if options[:concurrency_mode]
  xml += "<contentType>CSV</contentType>"
  xml += "</jobInfo>"

  response = http_post("job", xml)
  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  job = Job.new_from_xml(data)
end

#authenticateObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/salesforce_bulk/client.rb', line 40

def authenticate
  xml = '<?xml version="1.0" encoding="utf-8"?>'
  xml += '<env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema"'
  xml += ' xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"'
  xml += ' xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">'
  xml += "<env:Body>"
  xml += '<n1:login xmlns:n1="urn:partner.soap.sforce.com">'
  xml += "<n1:username>#{username}</n1:username>"
  xml += "<n1:password>#{password}</n1:password>"
  xml += "</n1:login>"
  xml += "</env:Body>"
  xml += "</env:Envelope>\n"

  response = http_post("/services/Soap/u/#{version}", xml, 'Content-Type' => 'text/xml', 'SOAPAction' => 'login')

  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  result = data['Body']['loginResponse']['result']

  @session_id = result['sessionId']

  self.instance_host = "#{instance_id(result['serverUrl'])}.salesforce.com"
  self
end

#batch_info(jobId, batchId) ⇒ Object



136
137
138
139
140
# File 'lib/salesforce_bulk/client.rb', line 136

def batch_info(jobId, batchId)
  response = http_get("job/#{jobId}/batch/#{batchId}")
  result = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Batch.new_from_xml(result)
end

#batch_info_list(jobId) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/salesforce_bulk/client.rb', line 123

def batch_info_list(jobId)
  response = http_get("job/#{jobId}/batch")
  result = XmlSimple.xml_in(response.body, 'ForceArray' => false)

  if result['batchInfo'].is_a?(Array)
    result['batchInfo'].collect do |info|
      Batch.new_from_xml(info)
    end
  else
    [Batch.new_from_xml(result['batchInfo'])]
  end
end

#batch_result(jobId, batchId) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/salesforce_bulk/client.rb', line 142

def batch_result(jobId, batchId)
  response = http_get("job/#{jobId}/batch/#{batchId}/result")

  if response.body =~ /<.*?>/m
    result = XmlSimple.xml_in(response.body)

    if result['result'].present?
      results = query_result(jobId, batchId, result['result'].first)

      collection = QueryResultCollection.new(self, jobId, batchId, result['result'].first, result['result'])
      collection.replace(results)
    end
  else
    result = BatchResultCollection.new(jobId, batchId)

    CSV.parse(response.body, :headers => true) do |row|
      result << BatchResult.new(row[0], row[1].to_b, row[2].to_b, row[3])
    end

    result
  end
end

#close_job(jobId) ⇒ Object



182
183
184
185
186
187
188
189
190
191
# File 'lib/salesforce_bulk/client.rb', line 182

def close_job(jobId)
  xml = '<?xml version="1.0" encoding="utf-8"?>'
  xml += '<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">'
  xml += "<state>Closed</state>"
  xml += "</jobInfo>"

  response = http_post("job/#{jobId}", xml)
  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Job.new_from_xml(data)
end

#http_get(path, headers = {}) ⇒ Object



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/salesforce_bulk/client.rb', line 219

def http_get(path, headers={})
  path = "#{@api_path_prefix}#{path}"

  headers = {'Content-Type' => 'application/xml'}.merge(headers)

  if @session_id
    headers['X-SFDC-Session'] = @session_id
  end

  response = https_request(self.instance_host).get(path, headers)

  if response.is_a?(Net::HTTPSuccess)
    response
  else
    raise SalesforceError.new(response)
  end
end

#http_post(path, body, headers = {}) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/salesforce_bulk/client.rb', line 199

def http_post(path, body, headers={})
  headers = {'Content-Type' => 'application/xml'}.merge(headers)

  if @session_id
    headers['X-SFDC-Session'] = @session_id
    host = instance_host
    path = "#{@api_path_prefix}#{path}"
  else
    host = self.
  end

  response = https_request(host).post(path, body, headers)

  if response.is_a?(Net::HTTPSuccess)
    response
  else
    raise SalesforceError.new(response)
  end
end

#https_request(host) ⇒ Object



237
238
239
240
241
242
# File 'lib/salesforce_bulk/client.rb', line 237

def https_request(host)
  req = Net::HTTP.new(host, 443)
  req.use_ssl = true
  req.verify_mode = OpenSSL::SSL::VERIFY_NONE
  req
end

#instance_id(url) ⇒ Object



244
245
246
# File 'lib/salesforce_bulk/client.rb', line 244

def instance_id(url)
  url.match(/:\/\/([a-zA-Z0-9\-\.]{2,}).salesforce/)[1]
end

#job_info(jobId) ⇒ Object



193
194
195
196
197
# File 'lib/salesforce_bulk/client.rb', line 193

def job_info(jobId)
  response = http_get("job/#{jobId}")
  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Job.new_from_xml(data)
end

#query_result(job_id, batch_id, result_id) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/salesforce_bulk/client.rb', line 165

def query_result(job_id, batch_id, result_id)
  headers = {"Content-Type" => "text/csv; charset=UTF-8"}
  response = http_get("job/#{job_id}/batch/#{batch_id}/result/#{result_id}", headers)

  lines = response.body.lines.to_a
  headers = CSV.parse_line(lines.shift).collect { |header| header.to_sym }

  result = []

  #CSV.parse(lines.join, :headers => headers, :converters => [:all, lambda{|s| s.to_b if s.kind_of? String }]) do |row|
  CSV.parse(lines.join, :headers => headers) do |row|
    result << Hash[row.headers.zip(row.fields)]
  end

  result
end