Class: SalesforceBulk::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/salesforce_bulk/client.rb

Overview

Interface for operating the Salesforce Bulk REST API

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/salesforce_bulk/client.rb', line 23

def initialize(options={})
  if options.is_a?(String)
    options = YAML.load_file(options)
    options.symbolize_keys!
  end
  
  options = {:login_host => 'login.salesforce.com', :version => 24.0}.merge(options)
  
  options.assert_valid_keys(:username, :password, :login_host, :version)
  
  self.username = options[:username]
  self.password = "#{options[:password]}"
  self. = options[:login_host]
  self.version = options[:version]
  
  @api_path_prefix = "/services/async/#{version}/"
  @valid_operations = [:delete, :insert, :update, :upsert, :query]
  @valid_concurrency_modes = ['Parallel', 'Serial']
end

Instance Attribute Details

#instance_hostObject

The instance host to use for API calls. Determined from login response.



12
13
14
# File 'lib/salesforce_bulk/client.rb', line 12

def instance_host
  @instance_host
end

#login_hostObject

The host to use for authentication. Defaults to login.salesforce.com.



9
10
11
# File 'lib/salesforce_bulk/client.rb', line 9

def 
  @login_host
end

#passwordObject

The Salesforce password



15
16
17
# File 'lib/salesforce_bulk/client.rb', line 15

def password
  @password
end

#usernameObject

The Salesforce username



18
19
20
# File 'lib/salesforce_bulk/client.rb', line 18

def username
  @username
end

#versionObject

The API version the client is using. Defaults to 24.0.



21
22
23
# File 'lib/salesforce_bulk/client.rb', line 21

def version
  @version
end

Instance Method Details

#abort_job(jobId) ⇒ Object



67
68
69
70
71
72
73
74
75
76
# File 'lib/salesforce_bulk/client.rb', line 67

def abort_job(jobId)
  xml = '<?xml version="1.0" encoding="utf-8"?>'
  xml += '<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">'
  xml += "<state>Aborted</state>"
  xml += "</jobInfo>"
  
  response = http_post("job/#{jobId}", xml)
  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Job.new_from_xml(data)
end

#add_batch(jobId, data) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/salesforce_bulk/client.rb', line 78

def add_batch(jobId, data)
  body = data
  
  if data.is_a?(Array)
    raise ArgumentError, "Data set exceeds 10000 record limit by #{data.length - 10000}" if data.length > 10000
    
    keys = data.first.keys
    body = keys.to_csv
    
    data.each do |item|
      item_values = keys.map { |key| item[key] }
      body += item_values.to_csv
    end
  end
  
  # Despite the content for a query operation batch being plain text we 
  # still have to specify CSV content type per API docs.
  response = http_post("job/#{jobId}/batch", body, "Content-Type" => "text/csv; charset=UTF-8")
  result = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Batch.new_from_xml(result)
end

#add_job(operation, sobject, options = {}) ⇒ Object

Raises:

  • (ArgumentError)


100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/salesforce_bulk/client.rb', line 100

def add_job(operation, sobject, options={})
  operation = operation.to_s.downcase.to_sym
  
  raise ArgumentError.new("Invalid operation: #{operation}") unless @valid_operations.include?(operation)
  
  options.assert_valid_keys(:external_id_field_name, :concurrency_mode)
  
  if options[:concurrency_mode]
    concurrency_mode = options[:concurrency_mode].capitalize
    raise ArgumentError.new("Invalid concurrency mode: #{concurrency_mode}") unless @valid_concurrency_modes.include?(concurrency_mode)
  end
  
  xml = '<?xml version="1.0" encoding="utf-8"?>'
  xml += '<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">'
  xml += "<operation>#{operation}</operation>"
  xml += "<object>#{sobject}</object>"
  xml += "<externalIdFieldName>#{options[:external_id_field_name]}</externalIdFieldName>" if options[:external_id_field_name]
  xml += "<concurrencyMode>#{options[:concurrency_mode]}</concurrencyMode>" if options[:concurrency_mode]
  xml += "<contentType>CSV</contentType>"
  xml += "</jobInfo>"
  
  response = http_post("job", xml)
  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  job = Job.new_from_xml(data)
end

#authenticateObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/salesforce_bulk/client.rb', line 43

def authenticate
  xml = '<?xml version="1.0" encoding="utf-8"?>'
  xml += '<env:Envelope xmlns:xsd="http://www.w3.org/2001/XMLSchema"'
  xml += ' xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"'
  xml += ' xmlns:env="http://schemas.xmlsoap.org/soap/envelope/">'
  xml += "<env:Body>"
  xml += '<n1:login xmlns:n1="urn:partner.soap.sforce.com">'
  xml += "<n1:username>#{username}</n1:username>"
  xml += "<n1:password>#{password}</n1:password>"
  xml += "</n1:login>"
  xml += "</env:Body>"
  xml += "</env:Envelope>\n"
  
  response = http_post("/services/Soap/u/#{version}", xml, 'Content-Type' => 'text/xml', 'SOAPAction' => 'login')
  
  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  result = data['Body']['loginResponse']['result']
  
  @session_id = result['sessionId']
  
  self.instance_host = "#{instance_id(result['serverUrl'])}.salesforce.com"
  self
end

#batch_info(jobId, batchId) ⇒ Object



139
140
141
142
143
# File 'lib/salesforce_bulk/client.rb', line 139

def batch_info(jobId, batchId)
  response = http_get("job/#{jobId}/batch/#{batchId}")
  result = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Batch.new_from_xml(result)
end

#batch_info_list(jobId) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/salesforce_bulk/client.rb', line 126

def batch_info_list(jobId)
  response = http_get("job/#{jobId}/batch")
  result = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  
  if result['batchInfo'].is_a?(Array)
    result['batchInfo'].collect do |info|
      Batch.new_from_xml(info)
    end
  else
    [Batch.new_from_xml(result['batchInfo'])]
  end
end

#batch_result(jobId, batchId) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/salesforce_bulk/client.rb', line 145

def batch_result(jobId, batchId)
  response = http_get("job/#{jobId}/batch/#{batchId}/result")
  
  if response.body =~ /<.*?>/m
    result = XmlSimple.xml_in(response.body)
    
    if result['result'].present?
      results = query_result(jobId, batchId, result['result'].first)
      
      collection = QueryResultCollection.new(self, jobId, batchId, result['result'].first, result['result'])
      collection.replace(results)
    end
  else
    result = BatchResultCollection.new(jobId, batchId)
    
    CSV.parse(response.body, :headers => true) do |row|
      result << BatchResult.new(row[0], row[1].to_b, row[2].to_b, row[3])
    end
    
    result
  end
end

#close_job(jobId) ⇒ Object



185
186
187
188
189
190
191
192
193
194
# File 'lib/salesforce_bulk/client.rb', line 185

def close_job(jobId)
  xml = '<?xml version="1.0" encoding="utf-8"?>'
  xml += '<jobInfo xmlns="http://www.force.com/2009/06/asyncapi/dataload">'
  xml += "<state>Closed</state>"
  xml += "</jobInfo>"
  
  response = http_post("job/#{jobId}", xml)
  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Job.new_from_xml(data)
end

#http_get(path, headers = {}) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/salesforce_bulk/client.rb', line 222

def http_get(path, headers={})
  path = "#{@api_path_prefix}#{path}"
  
  headers = {'Content-Type' => 'application/xml'}.merge(headers)
  
  if @session_id
    headers['X-SFDC-Session'] = @session_id
  end
  
  response = https_request(self.instance_host).get(path, headers)
  
  if response.is_a?(Net::HTTPSuccess)
    response
  else
    raise SalesforceError.new(response)
  end
end

#http_post(path, body, headers = {}) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/salesforce_bulk/client.rb', line 202

def http_post(path, body, headers={})
  headers = {'Content-Type' => 'application/xml'}.merge(headers)
  
  if @session_id
    headers['X-SFDC-Session'] = @session_id
    host = instance_host
    path = "#{@api_path_prefix}#{path}"
  else
    host = self.
  end
  
  response = https_request(host).post(path, body, headers)
  
  if response.is_a?(Net::HTTPSuccess)
    response
  else
    raise SalesforceError.new(response)
  end
end

#https_request(host) ⇒ Object



240
241
242
243
244
245
# File 'lib/salesforce_bulk/client.rb', line 240

def https_request(host)
  req = Net::HTTP.new(host, 443)
  req.use_ssl = true
  req.verify_mode = OpenSSL::SSL::VERIFY_NONE
  req
end

#instance_id(url) ⇒ Object



247
248
249
# File 'lib/salesforce_bulk/client.rb', line 247

def instance_id(url)
  url.match(/:\/\/([a-zA-Z0-9-]{2,}).salesforce/)[1]
end

#job_info(jobId) ⇒ Object



196
197
198
199
200
# File 'lib/salesforce_bulk/client.rb', line 196

def job_info(jobId)
  response = http_get("job/#{jobId}")
  data = XmlSimple.xml_in(response.body, 'ForceArray' => false)
  Job.new_from_xml(data)
end

#query_result(job_id, batch_id, result_id) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/salesforce_bulk/client.rb', line 168

def query_result(job_id, batch_id, result_id)
  headers = {"Content-Type" => "text/csv; charset=UTF-8"}
  response = http_get("job/#{job_id}/batch/#{batch_id}/result/#{result_id}", headers)
  
  lines = response.body.lines.to_a
  headers = CSV.parse_line(lines.shift).collect { |header| header.to_sym }
  
  result = []
  
  #CSV.parse(lines.join, :headers => headers, :converters => [:all, lambda{|s| s.to_b if s.kind_of? String }]) do |row|
  CSV.parse(lines.join, :headers => headers) do |row|
    result << Hash[row.headers.zip(row.fields)]
  end
  
  result
end