Module: PG::EM::Client::Helper

Included in:
Transaction
Defined in:
lib/em-pg-client-helper.rb

Overview

Some helper methods to make working with em-pg-client slightly less like trying to build a house with a packet of seeds and a particle accelerator... backwards.

Defined Under Namespace

Classes: BadSequelError

Instance Method Summary collapse

Instance Method Details

#db_bulk_insert(db, tbl, columns, rows, &blk) ⇒ EM::Deferrable

Note:

for details on the tbl, columns, and rows parameters, see PG::EM::Client::Helper::Transaction#bulk_insert.

Efficiently perform a "bulk" insert of multiple rows.

When you have a large quantity of data to insert into a table, you don't want to do it one row at a time -- that's really inefficient. On the other hand, if you do one giant multi-row insert statement, the insert will fail if any of the rows causes a constraint failure. What to do?

Well, here's our answer: try to insert all the records at once. If that fails with a constraint violation, then split the set of records in half and try to bulk insert each of those halves. Recurse in this fashion until you only have one record to insert.

Parameters:

  • db (PG::EM::Client, PG::EM::ConnectionPool)

    the connection against which the insert wil be run.

  • tbl (#to_sym)

    see PG::EM::Client::Helper::Transaction#bulk_insert.

  • columns (Array<#to_sym>)

    see PG::EM::Client::Helper::Transaction#bulk_insert.

  • rows (Array<Array<Object>>)

    see PG::EM::Client::Helper::Transaction#bulk_insert.

Returns:

  • (EM::Deferrable)

    the deferrable in which the query is being called; once the bulk insert completes successfully, the deferrable will succeed with the number of rows that were successfully inserted. If the insert could not be completed, the deferrable will fail (#errback) with the exception.

Since:

  • 2.0.0



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/em-pg-client-helper.rb', line 234

def db_bulk_insert(db, tbl, columns, rows, &blk)
	EM::Completion.new.tap do |df|
		df.callback(&blk) if blk

		db_transaction(db) do |txn|
			txn.bulk_insert(tbl, columns, rows) do |count|
				txn.commit do
					df.succeed(count)
				end
			end
		end.errback do |ex|
			df.fail(ex)
		end
	end
end

#db_insert(db, tbl, params) ⇒ EM::Deferrable

Run an insert query, without having to write a great pile of SQL all by yourself.

Parameters:

  • db (PG::EM::Client, PG::EM::ConnectionPool)

    the connection against which all database operations will be run.

  • tbl (#to_s)

    the name of the table into which you wish to insert your data. This parameter will be automatically quoted, if necessary.

  • params (Hash<#to_s, Object>)

    the fields you wish to insert into (the keys of the hash) and the values to insert into each field (the values of the hash). All field names and data will be automatically quoted and made safe, so you're automatically SQL injection-proof!

Returns:

  • (EM::Deferrable)

    the deferrable in which the query is being called; this means you should attach the code to run after the query completes with #callback, and you can attach an error handler with #errback if you like.



194
195
196
# File 'lib/em-pg-client-helper.rb', line 194

def db_insert(db, tbl, params)
	db.exec_defer(*insert_sql(tbl, params))
end

#db_sequel(db) {|sqldb| ... } ⇒ EM::Deferrable

Generate Sequel, and run it against the database connection provided.

This is the all-singing variant of #sequel_sql -- in addition to generating the SQL, we also run the result against the database connection you pass.

Examples:

A simple select

sequel(db) do |sqldb|
  sqldb[:artists]
end.callback do |res|
  ...
end.errback do |ex|
  logger.error "Query failed (#{ex.class}): #{ex.message}"
end

A very complicated select

sequel do |sqldb|
  sqldb[:posts].select_all(:posts).
    select_append(
      Sequel.
        function(:array_agg, :campaigns__name).
        distinct.
        as(:campaigns_names)
      ).
    join(:posts_terms, :post_id => :posts__id).
    join(:terms, :id => :posts_terms__term_id).
    join(:categories_terms, :term_id => :terms__id).
    join(:campaigns_categories, :category_id => :categories_terms__category_id).
    join(:campaigns, :id => :campaigns_categories__campaign_id).
    group_by(:posts__id)
end.callback do |res|
  ...
end

Delete some rows

sequel(db) do |sqldb|
  sqldb[:foo].where { { col.sql_number % 3 => 0 } }.delete
end.callback do |res|
  logger.info "Deleted #{res.cmd_tuples}"
end

Yields:

  • (sqldb)

    to allow you to call whatever sequel methods you like to generate the desired SQL.

Yield Parameters:

  • call (Sequel::Database)

    whatever you like against this to cause Sequel to generate the result you want.

Returns:

  • (EM::Deferrable)

    the callbacks attached to this deferrable will receive a PG::Result when the query completes successfully, or else the errbacks on this deferrable will be called in the event of an error.

Raises:

See Also:

  • PG::EM::Client::Helper.{{#sequel_sql}


153
154
155
# File 'lib/em-pg-client-helper.rb', line 153

def db_sequel(db, &blk)
	db.exec_defer(sequel_sql(&blk))
end

#db_transaction(db, opts = {}, &blk) ⇒ EM::Deferrable

Note:

Due to the way that transactions detect when they are completed, every deferrable in the scope of the transaction must be generated by the transaction. That is, you cannot use objects other than the transaction asynchronously. This is a known limitation, and will be addressed in a future version of this library.

Execute code in a transaction.

Calling this method opens up a transaction (by executing BEGIN), and then runs the supplied block, passing in a transaction object which you can use to execute SQL commands. Once the transaction is finished, COMMIT or ROLLBACK will be sent to the DB server to complete the transaction, depending on whether or not any errors (query failures or Ruby exceptions) appeared during the transaction. You can also manually call txn.rollback(reason) if you want to signal that the transaction should be rolled back.

You should use #callback and #errback against the deferrable that db_transaction returns to specify what to run after the transaction completes successfully or fails, respectively.

Parameters:

  • db (PG::EM::Client, PG::EM::ConnectionPool)

    the connection against which the transaction will be executed. If you pass a ConnectionPool, we will automatically hold a single connection for the transaction to complete against, so you don't have to worry about that, either.

  • opts (Hash) (defaults to: {})

    Zero or more options which change the behaviour of the transaction.

  • blk (Proc)

    code which will be executed within the context of the transaction. This block will be passed a Transaction instance, which has methods to allow you to commit or rollback the transaction, and execute SQL statements within the context of the transaction.

Options Hash (opts):

  • :isolation (Symbol)

    An isolation level for the transaction. Valid values are :serializable, :repeatable_read, :read_committed, and :read_uncommitted. The last two of these are pointless to use and are included only for completeness, as PostgreSQL's default isolation level is :read_committed, and :read_uncommitted is equivalent to :read_committed.

  • :retry (TrueClass, FalseClass)

    Whether or not to retry the transaction if it fails for one of a number of transaction-internal reasons.

  • :deferrable (TrueClass, FalseClass)

    If set, enables the DEFERRABLE transaction mode. For details of what this is, see the SET TRANSACTION command documentation in the PostgreSQL manual.

Returns:

  • (EM::Deferrable)

    on which you can call #callback and #errback to define what to do when the transaction succeeds or fails, respectively.

Raises:

  • (ArgumentError)

    If an unrecognised value for the :isolation option is given.



417
418
419
420
421
422
423
424
425
# File 'lib/em-pg-client-helper.rb', line 417

def db_transaction(db, opts = {}, &blk)
	if db.is_a? PG::EM::ConnectionPool
		db.__send__(:hold_deferred) do |conn|
			::PG::EM::Client::Helper::Transaction.new(conn, opts, &blk)
		end
	else
		::PG::EM::Client::Helper::Transaction.new(db, opts, &blk)
	end
end

#db_upsert(db, tbl, key, data) {|PG::Result| ... } ⇒ EM::Deferrable

Run an upsert query.

Apply an upsert (update-or-insert) against a given database connection or connection pool, handling the (rarely needed) unique violation that can result.

Parameters:

  • db (PG::EM::Client, PG::EM::ConnectionPool)

    the connection against which all database operations will be run.

  • tbl (#to_s)

    The name of the table on which to operate.

  • key (#to_s, Array<#to_s>)

    A field (or list of fields) which are the set of values that uniquely identify the record to be updated, if it exists. You only need to specify the field names here, as the values which will be used in the query will be taken from the data.

  • data (Hash<#to_s, Object>)

    The fields and values to insert into the database, or to set in the existing record.

Yields:

  • (PG::Result)

    the row of data that has been inserted/updated.

Returns:

  • (EM::Deferrable)

    the deferrable in which the query is being called; this means you should attach the code to run after the query completes with #callback, and you can attach an error handler with #errback if you like.

Raises:

  • (ArgumentError)

    if a field is specified in key but which does not exist in data.

See Also:



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/em-pg-client-helper.rb', line 339

def db_upsert(db, tbl, key, data)
	q = upsert_sql(tbl, key, data)

	::EM::DefaultDeferrable.new.tap do |df|
		db.exec_defer(*q).callback do |r|
			df.succeed(r)
		end.errback do |ex|
			if ex.is_a?(PG::UniqueViolation)
				db.exec_defer(*q).callback do |r|
					df.succeed(r)
				end.errback do |ex|
					df.fail(ex)
				end
			else
				df.fail(ex)
			end
		end
	end
end

#insert_sql(tbl, params) ⇒ Object

Generate SQL for an insert statement into tbl, with the fields and data given by the keys and values, respectively, of params. Returns a two-element array consisting of the parameterised SQL as the first element, and the array of parameters as the second element.

Parameters:

  • tbl (#to_s)
  • params (Hash<#to_s, Object>)


166
167
168
169
170
171
172
# File 'lib/em-pg-client-helper.rb', line 166

def insert_sql(tbl, params)
	keys = params.keys.map { |k| quote_identifier(k.to_s) }.join(',')
	vals = params.values
	val_places = (1..vals.length).to_a.map { |i| "$#{i}" }.join(',')

	["INSERT INTO #{quote_identifier(tbl.to_s)} (#{keys}) VALUES (#{val_places})", vals]
end

#quote_identifier(id) ⇒ String

Take a PgSQL identifier (anything that isn't data, basically) and quote it so that it will always be valid, no matter what insanity someone's decided to put in their names.

Parameters:

  • id (String)

Returns:

  • (String)

    just like id, but with added quoting.



435
436
437
# File 'lib/em-pg-client-helper.rb', line 435

def quote_identifier(id)
	"\"#{id.gsub(/"/, '""')}\""
end

#sequel_sql {|sqldb| ... } ⇒ String

Sequel-based SQL generation.

While we could spend a lot of time writing code to generate various kinds of SQL "by hand", it would be wasted effort, since the Sequel database toolkit gives us a complete, and extremely powerful, SQL generation system, which is already familiar to a great many programmers (or, at least, many great programmers).

Hence, rather than reinvent the wheel, we simply drop Sequel in.

Anything you can do with an instance of Sequel::Database that produces a single SQL query, you can almost certainly do with this method.

Usage is quite simple: calling this method will yield a pseudo-database object to the block you pass. You can then call whatever methods you like against the database object, and when you're done and the block you passed completes, we'll return the SQL that Sequel generated.

Examples:

A simple select

sequel(db) do |sqldb|
  sqldb[:artists]
end
# => "SELECT * FROM artists"

Delete some rows

sequel(db) do |sqldb|
  sqldb[:foo].where { { col.sql_number % 3 => 0 } }.delete
end.callback do |res|
  logger.info "Deleted #{res.cmd_tuples}"
end
# => "DELETE FROM foo WHERE col % 3 = 0"

A very complicated select

sequel do |sqldb|
  sqldb[:posts].select_all(:posts).
    select_append(
      Sequel.
        function(:array_agg, :campaigns__name).
        distinct.
        as(:campaigns_names)
      ).
    join(:posts_terms, :post_id => :posts__id).
    join(:terms, :id => :posts_terms__term_id).
    join(:categories_terms, :term_id => :terms__id).
    join(:campaigns_categories, :category_id => :categories_terms__category_id).
    join(:campaigns, :id => :campaigns_categories__campaign_id).
    group_by(:posts__id)
end
# ... you don't want to know.

Yields:

  • (sqldb)

    to allow you to call whatever Sequel methods you like to generate the desired SQL.

Yield Parameters:

  • call (Sequel::Database)

    whatever you like against this to cause Sequel to generate the result you want.

Returns:

  • (String)

    the generated SQL.

Raises:



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/em-pg-client-helper.rb', line 73

def sequel_sql
	sqldb = Thread.current[:em_pg_client_sequel_db] ||= Sequel.connect("mock://postgres", :keep_reference => false)
	ret = yield sqldb if block_given?
	sqls = sqldb.sqls

	if sqls.empty?
		sqls = [ret.sql] rescue ret
	end

	if sqls.nil? or sqls.empty?
		raise PG::EM::Client::Helper::BadSequelError,
		      "Your block did not generate an SQL statement"
	end

	if sqls.length > 1
		raise PG::EM::Client::Helper::BadSequelError,
		      "Your block generated multiple SQL statements"
	end

	sqls.first
end

#upsert_sql(tbl, key, data) ⇒ Array<String, Array<Object>>

An "upsert" is a kind of crazy hybrid "update if the record exists, insert it if it doesn't" query. It isn't part of the SQL standard, but it is such a common idiom that we're keen to support it.

The trick is that it's actually two queries in one. We try to do an UPDATE first, and if that doesn't actually update anything, then we try an INSERT. Since it is two separate queries, though, there is still a small chance that the query will fail with a PG::UniqueViolation, so your code must handle that.

As an added bonus, the SQL that this method generates will, when executed, return the complete row that has been inserted or updated.

Parameters:

  • tbl (#to_s)

    The name of the table on which to operate.

  • key (#to_s, Array<#to_s>)

    A field (or list of fields) which are the set of values that uniquely identify the record to be updated, if it exists. You only need to specify the field names here, as the values which will be used in the query will be taken from the data.

  • data (Hash<#to_s, Object>)

    The fields and values to insert into the database, or to set in the existing record.

Returns:

  • (Array<String, Array<Object>>)

    A two-element array, the first of which is a string containing the literal SQL to be executed, while the second element is an array containing the values, in order corresponding to the placeholders in the SQL.

Raises:

  • (ArgumentError)

    if a field is specified in key but which does not exist in data.



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/em-pg-client-helper.rb', line 286

def upsert_sql(tbl, key, data)
	tbl = quote_identifier(tbl)
	insert_keys = data.keys.map { |k| quote_identifier(k.to_s) }
	unique_keys = (key.is_a?(Array) ? key : [key])
	unique_keys.map! { |k| quote_identifier(k.to_s) }
	update_keys = insert_keys - unique_keys

	unless (bad_keys = unique_keys - insert_keys).empty?
		raise ArgumentError,
		      "These field(s) were mentioned in the key list, but were not in the data set: #{bad_keys.inspect}"
	end

	values = data.values
	# field-to-placeholder mapping
	i = 0
	fp_map = Hash[insert_keys.map { |k| i += 1; [k, "$#{i}"] }]

	update_values = update_keys.map { |k| "#{k}=#{fp_map[k]}" }.join(',')
	select_values = unique_keys.map { |k| "#{k}=#{fp_map[k]}" }.join(' AND ')
	update_query = "UPDATE #{tbl} SET #{update_values} WHERE #{select_values} RETURNING *"

	insert_query = "INSERT INTO #{tbl} (#{fp_map.keys.join(',')}) " +
	               "SELECT #{fp_map.values.join(',')}"

	["WITH update_query AS (#{update_query}), " +
	      "insert_query AS (#{insert_query} " +
	        "WHERE NOT EXISTS (SELECT * FROM update_query) " +
	        "RETURNING *) " +
	      "SELECT * FROM update_query UNION SELECT * FROM insert_query",
	 data.values
	]
end