Method: PG::Connection#copy_data

Defined in:
lib/pg/connection.rb

#copy_data(sql, coder = nil) ⇒ Object

call-seq:

conn.copy_data( sql [, coder] ) {|sql_result| ... } -> PG::Result

Execute a copy process for transferring data to or from the server.

This issues the SQL COPY command via #exec. The response to this (if there is no error in the command) is a PG::Result object that is passed to the block, bearing a status code of PGRES_COPY_OUT or PGRES_COPY_IN (depending on the specified copy direction). The application should then use #put_copy_data or #get_copy_data to receive or transmit data rows and should return from the block when finished.

#copy_data returns another PG::Result object when the data transfer is complete. An exception is raised if some problem was encountered, so it isn’t required to make use of any of them. At this point further SQL commands can be issued via #exec. (It is not possible to execute other SQL commands using the same connection while the COPY operation is in progress.)

This method ensures, that the copy process is properly terminated in case of client side or server side failures. Therefore, in case of blocking mode of operation, #copy_data is preferred to raw calls of #put_copy_data, #get_copy_data and #put_copy_end.

coder can be a PG::Coder derivation (typically PG::TextEncoder::CopyRow or PG::TextDecoder::CopyRow). This enables encoding of data fields given to #put_copy_data or decoding of fields received by #get_copy_data.

Example with CSV input format:

conn.exec "create table my_table (a text,b text,c text,d text)"
conn.copy_data "COPY my_table FROM STDIN CSV" do
  conn.put_copy_data "some,data,to,copy\n"
  conn.put_copy_data "more,data,to,copy\n"
end

This creates my_table and inserts two CSV rows.

The same with text format encoder PG::TextEncoder::CopyRow and Array input:

enco = PG::TextEncoder::CopyRow.new
conn.copy_data "COPY my_table FROM STDIN", enco do
  conn.put_copy_data ['some', 'data', 'to', 'copy']
  conn.put_copy_data ['more', 'data', 'to', 'copy']
end

Also PG::BinaryEncoder::CopyRow can be used to send data in binary format to the server. In this case copy_data generates the header and trailer data automatically:

enco = PG::BinaryEncoder::CopyRow.new
conn.copy_data "COPY my_table FROM STDIN (FORMAT binary)", enco do
  conn.put_copy_data ['some', 'data', 'to', 'copy']
  conn.put_copy_data ['more', 'data', 'to', 'copy']
end

Example with CSV output format:

conn.copy_data "COPY my_table TO STDOUT CSV" do
  while row=conn.get_copy_data
    p row
  end
end

This prints all rows of my_table to stdout:

"some,data,to,copy\n"
"more,data,to,copy\n"

The same with text format decoder PG::TextDecoder::CopyRow and Array output:

deco = PG::TextDecoder::CopyRow.new
conn.copy_data "COPY my_table TO STDOUT", deco do
  while row=conn.get_copy_data
    p row
  end
end

This receives all rows of my_table as ruby array:

["some", "data", "to", "copy"]
["more", "data", "to", "copy"]

Also PG::BinaryDecoder::CopyRow can be used to retrieve data in binary format from the server. In this case the header and trailer data is processed by the decoder and the remaining nil from get_copy_data is processed by copy_data, so that binary data can be processed equally to text data:

deco = PG::BinaryDecoder::CopyRow.new
conn.copy_data "COPY my_table TO STDOUT (FORMAT binary)", deco do
  while row=conn.get_copy_data
    p row
  end
end

This receives all rows of my_table as ruby array:

["some", "data", "to", "copy"]
["more", "data", "to", "copy"]


211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/pg/connection.rb', line 211

def copy_data( sql, coder=nil )
	raise PG::NotInBlockingMode.new("copy_data can not be used in nonblocking mode", connection: self) if nonblocking?
	res = exec( sql )

	case res.result_status
	when PGRES_COPY_IN
		begin
			if coder && res.binary_tuples == 1
				# Binary file header (11 byte signature, 32 bit flags and 32 bit extension length)
				put_copy_data(BinarySignature + ("\x00" * 8))
			end

			if coder
				old_coder = self.encoder_for_put_copy_data
				self.encoder_for_put_copy_data = coder
			end

			yield res
		rescue Exception => err
			errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
			begin
				put_copy_end( errmsg )
			rescue PG::Error
				# Ignore error in cleanup to avoid losing original exception
			end
			discard_results
			raise err
		else
			begin
				self.encoder_for_put_copy_data = old_coder if coder

				if coder && res.binary_tuples == 1
					put_copy_data("\xFF\xFF") # Binary file trailer 16 bit "-1"
				end

				put_copy_end
			rescue PG::Error => err
				raise PG::LostCopyState.new("#{err} (probably by executing another SQL query while running a COPY command)", connection: self)
			end
			get_last_result
		ensure
			self.encoder_for_put_copy_data = old_coder if coder
		end

	when PGRES_COPY_OUT
		begin
			if coder
				old_coder = self.decoder_for_get_copy_data
				self.decoder_for_get_copy_data = coder
			end
			yield res
		rescue Exception
			cancel
			discard_results
			raise
		else
			if coder && res.binary_tuples == 1
				# There are two end markers in binary mode: file trailer and the final nil.
				# The file trailer is expected to be processed by BinaryDecoder::CopyRow and already returns nil, so that the remaining NULL from PQgetCopyData is retrieved here:
				if get_copy_data
					discard_results
					raise PG::NotAllCopyDataRetrieved.new("Not all binary COPY data retrieved", connection: self)
				end
			end
			res = get_last_result
			if !res
				discard_results
				raise PG::LostCopyState.new("Lost COPY state (probably by executing another SQL query while running a COPY command)", connection: self)
			elsif res.result_status != PGRES_COMMAND_OK
				discard_results
				raise PG::NotAllCopyDataRetrieved.new("Not all COPY data retrieved", connection: self)
			end
			res
		ensure
			self.decoder_for_get_copy_data = old_coder if coder
		end

	else
		raise ArgumentError, "SQL command is no COPY statement: #{sql}"
	end
end