Module: ActiveRecord::Import::PostgreSQLAdapter

Includes:
ImportSupport, OnDuplicateKeyUpdateSupport
Included in:
ConnectionAdapters::PostgreSQLAdapter
Defined in:
lib/activerecord-import/adapters/postgresql_adapter.rb

Constant Summary collapse

MIN_VERSION_FOR_UPSERT =
90_500

Instance Method Summary collapse

Methods included from ImportSupport

#supports_import?

Instance Method Details

#add_column_for_on_duplicate_key_update(column, options = {}) ⇒ Object

Add a column to be updated on duplicate key update



55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 55

def add_column_for_on_duplicate_key_update( column, options = {} ) # :nodoc:
  arg = options[:on_duplicate_key_update]
  if arg.is_a?( Hash )
    columns = arg.fetch( :columns ) { arg[:columns] = [] }
    case columns
    when Array then columns << column.to_sym unless columns.include?( column.to_sym )
    when Hash then columns[column.to_sym] = column.to_sym
    end
  elsif arg.is_a?( Array )
    arg << column.to_sym unless arg.include?( column.to_sym )
  end
end

#duplicate_key_update_error?(exception) ⇒ Boolean

Return true if the statement is a duplicate key record error

Returns:

  • (Boolean)


157
158
159
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 157

def duplicate_key_update_error?(exception) # :nodoc:
  exception.is_a?(ActiveRecord::StatementInvalid) && exception.to_s.include?('duplicate key')
end

#insert_many(sql, values, *args) ⇒ Object

:nodoc:



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 7

def insert_many( sql, values, *args ) # :nodoc:
  number_of_inserts = 1
  ids = []

  base_sql, post_sql = if sql.is_a?( String )
    [sql, '']
  elsif sql.is_a?( Array )
    [sql.shift, sql.join( ' ' )]
  end

  sql2insert = base_sql + values.join( ',' ) + post_sql
  if post_sql =~ /RETURNING\s/
    ids = select_values( sql2insert, *args )
    query_cache.clear if query_cache_enabled
  else
    insert( sql2insert, *args )
  end

  [number_of_inserts, ids]
end

#next_value_for_sequence(sequence_name) ⇒ Object



28
29
30
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 28

def next_value_for_sequence(sequence_name)
  %{nextval('#{sequence_name}')}
end

#post_sql_statements(table_name, options) ⇒ Object

:nodoc:



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 32

def post_sql_statements( table_name, options ) # :nodoc:
  sql = []

  if supports_on_duplicate_key_update?
    # Options :recursive and :on_duplicate_key_ignore are mutually exclusive
    if (options[:ignore] || options[:on_duplicate_key_ignore]) && !options[:on_duplicate_key_update] && !options[:recursive]
      sql << sql_for_on_duplicate_key_ignore( table_name, options[:on_duplicate_key_ignore] )
    end
  elsif options[:on_duplicate_key_ignore] && !options[:on_duplicate_key_update]
    logger.warn "Ignoring on_duplicate_key_ignore because it is not supported by the database."
  end

  sql += super(table_name, options)

  unless options[:no_returning] || options[:primary_key].blank?
    primary_key = Array(options[:primary_key])
    sql << " RETURNING (#{primary_key.join(', ')})"
  end

  sql
end

#sql_for_conflict_target(args = {}) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 126

def sql_for_conflict_target( args = {} )
  constraint_name = args[:constraint_name]
  conflict_target = args[:conflict_target]
  index_predicate = args[:index_predicate]
  if constraint_name.present?
    "ON CONSTRAINT #{constraint_name} "
  elsif conflict_target.present?
    '(' << Array( conflict_target ).reject( &:empty? ).join( ', ' ) << ') '.tap do |sql|
      sql << "WHERE #{index_predicate} " if index_predicate
    end
  end
end

#sql_for_default_conflict_target(table_name) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 139

def sql_for_default_conflict_target( table_name )
  pks = select_values("    WITH pk_constraint AS (\n      SELECT conrelid, unnest(conkey) AS connum FROM pg_constraint\n      WHERE contype = 'p'\n        AND conrelid = \#{quote(quote_table_name(table_name))}::regclass\n    ), cons AS (\n      SELECT conrelid, connum, row_number() OVER() AS rownum FROM pk_constraint\n    )\n    SELECT attr.attname FROM pg_attribute attr\n    INNER JOIN cons ON attr.attrelid = cons.conrelid AND attr.attnum = cons.connum\n    ORDER BY cons.rownum\n  SQL\n  conflict_target = pks.join(', ')\n  \"(\#{conflict_target}) \" if conflict_target.present?\nend\n".strip_heredoc, "SCHEMA")

#sql_for_on_duplicate_key_ignore(table_name, *args) ⇒ Object

Returns a generated ON CONFLICT DO NOTHING statement given the passed in args.



70
71
72
73
74
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 70

def sql_for_on_duplicate_key_ignore( table_name, *args ) # :nodoc:
  arg = args.first
  conflict_target = sql_for_conflict_target( arg ) if arg.is_a?( Hash )
  " ON CONFLICT #{conflict_target}DO NOTHING"
end

#sql_for_on_duplicate_key_update(table_name, *args) ⇒ Object

Returns a generated ON CONFLICT DO UPDATE statement given the passed in args.



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 78

def sql_for_on_duplicate_key_update( table_name, *args ) # :nodoc:
  arg = args.first
  arg = { columns: arg } if arg.is_a?( Array ) || arg.is_a?( String )
  return unless arg.is_a?( Hash )

  sql = " ON CONFLICT "
  conflict_target = sql_for_conflict_target( arg )

  columns = arg.fetch( :columns, [] )
  if columns.respond_to?( :empty? ) && columns.empty?
    return sql << "#{conflict_target}DO NOTHING"
  end

  conflict_target ||= sql_for_default_conflict_target( table_name )
  unless conflict_target
    raise ArgumentError, 'Expected :conflict_target or :constraint_name to be specified'
  end

  sql << "#{conflict_target}DO UPDATE SET "
  if columns.is_a?( Array )
    sql << sql_for_on_duplicate_key_update_as_array( table_name, columns )
  elsif columns.is_a?( Hash )
    sql << sql_for_on_duplicate_key_update_as_hash( table_name, columns )
  elsif columns.is_a?( String )
    sql << columns
  else
    raise ArgumentError, 'Expected :columns to be an Array or Hash'
  end
  sql
end

#sql_for_on_duplicate_key_update_as_array(table_name, arr) ⇒ Object

:nodoc:



109
110
111
112
113
114
115
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 109

def sql_for_on_duplicate_key_update_as_array( table_name, arr ) # :nodoc:
  results = arr.map do |column|
    qc = quote_column_name( column )
    "#{qc}=EXCLUDED.#{qc}"
  end
  results.join( ',' )
end

#sql_for_on_duplicate_key_update_as_hash(table_name, hsh) ⇒ Object

:nodoc:



117
118
119
120
121
122
123
124
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 117

def sql_for_on_duplicate_key_update_as_hash( table_name, hsh ) # :nodoc:
  results = hsh.map do |column1, column2|
    qc1 = quote_column_name( column1 )
    qc2 = quote_column_name( column2 )
    "#{qc1}=EXCLUDED.#{qc2}"
  end
  results.join( ',' )
end

#support_setting_primary_key_of_imported_objects?Boolean

Returns:

  • (Boolean)


165
166
167
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 165

def support_setting_primary_key_of_imported_objects?
  true
end

#supports_on_duplicate_key_update?(current_version = postgresql_version) ⇒ Boolean

Returns:

  • (Boolean)


161
162
163
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 161

def supports_on_duplicate_key_update?(current_version = postgresql_version)
  current_version >= MIN_VERSION_FOR_UPSERT
end