Module: Apartment::TaskHelper
- Defined in:
- lib/apartment/tasks/task_helper.rb
Overview
Coordinates tenant operations for rake tasks with parallel execution support.
## Problem Context
Multi-tenant applications with many schemas face slow migration times when running sequentially. A 100-tenant system with 2-second migrations takes 3+ minutes sequentially but ~20 seconds with 10 parallel workers.
## Why This Design
Parallel database migrations introduce two categories of problems:
-
**Platform-specific fork safety**: macOS/Windows have issues with libpq (PostgreSQL C library) after fork() due to GSS/Kerberos state corruption. Linux handles fork() cleanly. We auto-detect and choose the safe strategy.
-
**PostgreSQL advisory lock deadlocks**: Rails uses advisory locks to prevent concurrent migrations. When multiple processes/threads migrate different schemas simultaneously, they deadlock competing for the same lock. We disable advisory locks during parallel execution, which means **you accept responsibility for ensuring your migrations are parallel-safe**.
## When to Use Parallel Migrations
This is an advanced feature. Use it when:
-
You have many tenants and sequential migration time is problematic
-
Your migrations only modify tenant-specific schema objects
-
You’ve verified your migrations don’t have cross-schema side effects
Stick with sequential execution (the default) when:
-
Migrations create/modify extensions, types, or shared objects
-
Migrations have ordering dependencies across tenants
-
You’re unsure whether parallel execution is safe for your use case
## Gotchas
-
The ‘parallel_migration_threads` count should be less than your connection pool size to avoid connection exhaustion.
-
Empty/nil tenant names from ‘tenant_names` proc are filtered to prevent PostgreSQL “zero-length delimited identifier” errors.
-
Process-based parallelism requires fresh connections in each fork; thread-based parallelism shares the pool but needs explicit checkout.
Defined Under Namespace
Classes: Result
Class Method Summary collapse
-
.create_tenant(tenant_name) ⇒ Object
Create a tenant with logging.
-
.display_summary(operation, results) ⇒ Object
Display summary of operation results.
-
.each_tenant {|String| ... } ⇒ Array<Result>
Primary entry point for tenant iteration.
-
.each_tenant_in_processes ⇒ Object
Process-based parallelism via fork().
-
.each_tenant_in_threads ⇒ Object
Thread-based parallelism.
-
.each_tenant_parallel ⇒ Object
Parallel execution wrapper.
-
.each_tenant_sequential ⇒ Object
Sequential execution: simpler, no connection management complexity.
-
.fork_safe_platform? ⇒ Boolean
Platform detection.
-
.migrate_tenant(tenant_name) ⇒ Object
Migrate a single tenant with error handling based on strategy.
-
.reconnect_for_parallel_execution ⇒ Object
Re-establishes database connection for parallel execution.
-
.resolve_parallel_strategy ⇒ Object
Auto-detection logic for parallelism strategy.
-
.tenants ⇒ Array<String>
Get list of all tenants to operate on Supports DB env var for targeting specific tenants Filters out blank tenant names for safety.
-
.tenants_without_default ⇒ Array<String>
Get list of tenants excluding the default tenant Also filters out blank/empty tenant names to prevent errors.
-
.warn_if_tenants_empty ⇒ Object
Display warning if tenant list is empty.
-
.with_advisory_locks_disabled ⇒ Object
Advisory lock management.
Class Method Details
.create_tenant(tenant_name) ⇒ Object
Create a tenant with logging
269 270 271 272 273 274 |
# File 'lib/apartment/tasks/task_helper.rb', line 269 def create_tenant(tenant_name) puts("Creating #{tenant_name} tenant") Apartment::Tenant.create(tenant_name) rescue Apartment::TenantExists => e puts "Tried to create already existing tenant: #{e}" end |
.display_summary(operation, results) ⇒ Object
Display summary of operation results
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/apartment/tasks/task_helper.rb', line 249 def display_summary(operation, results) return if results.empty? succeeded = results.count(&:success) failed = results.reject(&:success) puts "\n=== #{operation} Summary ===" puts "Succeeded: #{succeeded}/#{results.size} tenants" return if failed.empty? puts "Failed: #{failed.size} tenants" failed.each do |result| puts " - #{result.tenant}: #{result.error}" end end |
.each_tenant {|String| ... } ⇒ Array<Result>
Primary entry point for tenant iteration. Automatically selects sequential or parallel execution based on configuration.
63 64 65 66 67 68 69 70 71 |
# File 'lib/apartment/tasks/task_helper.rb', line 63 def each_tenant(&) return [] if tenants_without_default.empty? if parallel_migration_threads.positive? each_tenant_parallel(&) else each_tenant_sequential(&) end end |
.each_tenant_in_processes ⇒ Object
Process-based parallelism via fork(). Faster on Linux due to copy-on-write memory and no GIL contention. Each forked process gets isolated memory, so we must clear inherited connections and establish fresh ones.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/apartment/tasks/task_helper.rb', line 103 def each_tenant_in_processes Parallel.map(tenants_without_default, in_processes: parallel_migration_threads) do |tenant| # Forked processes inherit parent's connection handles but the # underlying sockets are invalid. Must reconnect before any DB work. ActiveRecord::Base.connection_handler.clear_all_connections!(:all) reconnect_for_parallel_execution Rails.application.executor.wrap do yield(tenant) end Result.new(tenant: tenant, success: true, error: nil) rescue StandardError => e Result.new(tenant: tenant, success: false, error: e.) ensure ActiveRecord::Base.connection_handler.clear_all_connections!(:all) end end |
.each_tenant_in_threads ⇒ Object
Thread-based parallelism. Safe on all platforms but subject to GIL for CPU-bound work (migrations are typically I/O-bound, so this is fine). Threads share the connection pool, so we reconfigure once before spawning and restore after completion.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/apartment/tasks/task_helper.rb', line 125 def each_tenant_in_threads original_config = ActiveRecord::Base.connection_db_config.configuration_hash reconnect_for_parallel_execution Parallel.map(tenants_without_default, in_threads: parallel_migration_threads) do |tenant| # Explicit connection checkout prevents pool exhaustion when # thread count exceeds pool size minus buffer. ActiveRecord::Base.connection_pool.with_connection do Rails.application.executor.wrap do yield(tenant) end end Result.new(tenant: tenant, success: true, error: nil) rescue StandardError => e Result.new(tenant: tenant, success: false, error: e.) end ensure ActiveRecord::Base.connection_handler.clear_all_connections!(:all) ActiveRecord::Base.establish_connection(original_config) if original_config end |
.each_tenant_parallel ⇒ Object
Parallel execution wrapper. Disables advisory locks for the duration, then delegates to platform-appropriate parallelism strategy.
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/apartment/tasks/task_helper.rb', line 88 def each_tenant_parallel(&) with_advisory_locks_disabled do case resolve_parallel_strategy when :processes each_tenant_in_processes(&) else each_tenant_in_threads(&) end end end |
.each_tenant_sequential ⇒ Object
Sequential execution: simpler, no connection management complexity. Used when parallel_migration_threads is 0 (the default).
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/apartment/tasks/task_helper.rb', line 75 def each_tenant_sequential tenants_without_default.map do |tenant| Rails.application.executor.wrap do yield(tenant) end Result.new(tenant: tenant, success: true, error: nil) rescue StandardError => e Result.new(tenant: tenant, success: false, error: e.) end end |
.fork_safe_platform? ⇒ Boolean
Platform detection. Conservative: only Linux is considered fork-safe. macOS has documented issues with libpq, GSS-API, and Kerberos after fork. See: www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNECT-GSSENCMODE
160 161 162 |
# File 'lib/apartment/tasks/task_helper.rb', line 160 def fork_safe_platform? RUBY_PLATFORM.include?('linux') end |
.migrate_tenant(tenant_name) ⇒ Object
Migrate a single tenant with error handling based on strategy
279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/apartment/tasks/task_helper.rb', line 279 def migrate_tenant(tenant_name) strategy = Apartment.db_migrate_tenant_missing_strategy create_tenant(tenant_name) if strategy == :create_tenant puts("Migrating #{tenant_name} tenant") Apartment::Migrator.migrate(tenant_name) rescue Apartment::TenantNotFound => e raise(e) if strategy == :raise_exception puts e. end |
.reconnect_for_parallel_execution ⇒ Object
Re-establishes database connection for parallel execution. When manage_advisory_locks is true, disables advisory locks in the connection config (belt-and-suspenders with the ENV var approach). When false, reconnects with existing config unchanged.
197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/apartment/tasks/task_helper.rb', line 197 def reconnect_for_parallel_execution current_config = ActiveRecord::Base.connection_db_config.configuration_hash new_config = if Apartment.manage_advisory_locks current_config.merge(advisory_locks: false) else current_config end ActiveRecord::Base.establish_connection(new_config) end |
.resolve_parallel_strategy ⇒ Object
Auto-detection logic for parallelism strategy. Only Linux gets process-based parallelism by default due to macOS libpq fork issues.
148 149 150 151 152 153 154 155 |
# File 'lib/apartment/tasks/task_helper.rb', line 148 def resolve_parallel_strategy strategy = Apartment.parallel_strategy return :threads if strategy == :threads return :processes if strategy == :processes fork_safe_platform? ? :processes : :threads end |
.tenants ⇒ Array<String>
Get list of all tenants to operate on Supports DB env var for targeting specific tenants Filters out blank tenant names for safety
225 226 227 228 |
# File 'lib/apartment/tasks/task_helper.rb', line 225 def tenants result = ENV['DB'] ? ENV['DB'].split(',').map(&:strip) : Apartment.tenant_names || [] result.reject { |t| t.nil? || t.to_s.strip.empty? } end |
.tenants_without_default ⇒ Array<String>
Get list of tenants excluding the default tenant Also filters out blank/empty tenant names to prevent errors
216 217 218 |
# File 'lib/apartment/tasks/task_helper.rb', line 216 def tenants_without_default (tenants - [Apartment.default_tenant]).reject { |t| t.nil? || t.to_s.strip.empty? } end |
.warn_if_tenants_empty ⇒ Object
Display warning if tenant list is empty
231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/apartment/tasks/task_helper.rb', line 231 def warn_if_tenants_empty return unless tenants.empty? && ENV['IGNORE_EMPTY_TENANTS'] != 'true' puts <<~WARNING [WARNING] - The list of tenants to migrate appears to be empty. This could mean a few things: 1. You may not have created any, in which case you can ignore this message 2. You've run `apartment:migrate` directly without loading the Rails environment * `apartment:migrate` is now deprecated. Tenants will automatically be migrated with `db:migrate` Note that your tenants currently haven't been migrated. You'll need to run `db:migrate` to rectify this. WARNING end |
.with_advisory_locks_disabled ⇒ Object
Advisory lock management. Rails acquires pg_advisory_lock during migrations to prevent concurrent schema changes. With parallel tenant migrations, this causes deadlocks since all workers compete for the same lock.
Important: Disabling advisory locks shifts responsibility to you. Your migrations must be safe to run concurrently across tenants. If your migrations modify shared resources, create extensions, or have other cross-schema side effects, parallel execution may cause failures. When in doubt, use sequential execution (parallel_migration_threads = 0).
Uses ENV var because Rails checks it at connection establishment time, and we need it disabled before Parallel spawns workers.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/apartment/tasks/task_helper.rb', line 176 def with_advisory_locks_disabled return yield unless parallel_migration_threads.positive? return yield unless Apartment.manage_advisory_locks original_env_value = ENV.fetch('DISABLE_ADVISORY_LOCKS', nil) begin ENV['DISABLE_ADVISORY_LOCKS'] = 'true' yield ensure if original_env_value.nil? ENV.delete('DISABLE_ADVISORY_LOCKS') else ENV['DISABLE_ADVISORY_LOCKS'] = original_env_value end end end |