Class: Kaal::Backend::PostgresAdapter

Inherits:
Adapter
  • Object
show all
Includes:
DispatchLogging
Defined in:
lib/kaal/backend/postgres_adapter.rb

Overview

Distributed backend adapter using PostgreSQL advisory locks.

This adapter uses PostgreSQL’s pg_try_advisory_lock function for distributed locking across multiple nodes. Locks are connection-based and automatically released when the database connection is closed.

**IMPORTANT LIMITATIONS:**

  • The ttl parameter is ignored. Locks do not auto-expire based on time; they persist until the database connection terminates.

  • If a process crashes while holding a lock, the lock will remain held until the connection timeout occurs (typically 30-60 minutes). For critical systems, consider monitoring stale locks or using a time-based fallback mechanism.

  • Ensure connection pooling is properly configured to release connections promptly when processes terminate.

Optionally logs all dispatch attempts to the database when enable_log_dispatch_registry is enabled in configuration.

Examples:

Using the PostgreSQL adapter

Kaal.configure do |config|
  config.backend = Kaal::Backend::PostgresAdapter.new
  config.enable_log_dispatch_registry = true  # Enable dispatch logging
end

Instance Method Summary collapse

Methods included from DispatchLogging

#log_dispatch_attempt, #parse_lock_key, parse_lock_key

Methods inherited from Adapter

#with_lock

Constructor Details

#initializePostgresAdapter

Initialize a new PostgreSQL adapter.



44
45
46
47
48
49
# File 'lib/kaal/backend/postgres_adapter.rb', line 44

def initialize
  super
  @signed_64_max = 9_223_372_036_854_775_807
  @unsigned_64_range = 18_446_744_073_709_551_616
  @false_value_pattern = /\A(f|false|0|)\z/i
end

Instance Method Details

#acquire(key, _ttl) ⇒ Boolean

Attempt to acquire a distributed lock using PostgreSQL advisory lock.

Converts the lock key to a deterministic 64-bit integer hash and attempts to acquire the advisory lock. If successful, logs the dispatch attempt when enable_log_dispatch_registry is enabled.

Note: The ttl parameter is ignored. PostgreSQL advisory locks are connection-based and do not auto-expire. The lock will be held until the database connection is closed. See class documentation for limitations.

Parameters:

  • key (String)

    the lock key (format: “namespace:dispatch:cron_key:fire_time”)

  • ttl (Integer)

    time-to-live in seconds (ignored; see class docs)

Returns:

  • (Boolean)

    true if acquired, false if held by another process



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/kaal/backend/postgres_adapter.rb', line 81

def acquire(key, _ttl)
  lock_id = calculate_lock_id(key)

  sql = ActiveRecord::Base.sanitize_sql_array(['SELECT pg_try_advisory_lock(?)', lock_id])
  acquired = cast_to_boolean(ActiveRecord::Base.connection.execute(sql).first['pg_try_advisory_lock'])

  log_dispatch_attempt(key) if acquired

  acquired
rescue StandardError => e
  raise LockAdapterError, "PostgreSQL acquire failed for #{key}: #{e.message}"
end

#definition_registryKaal::Definition::DatabaseEngine

Get the definition registry for database-backed definition persistence.

Returns:



63
64
65
# File 'lib/kaal/backend/postgres_adapter.rb', line 63

def definition_registry
  @definition_registry ||= Kaal::Definition::DatabaseEngine.new
end

#dispatch_registryKaal::Dispatch::DatabaseEngine

Get the dispatch registry for database logging.

Returns:



55
56
57
# File 'lib/kaal/backend/postgres_adapter.rb', line 55

def dispatch_registry
  @dispatch_registry ||= Kaal::Dispatch::DatabaseEngine.new
end

#release(key) ⇒ Boolean

Release a distributed lock held by PostgreSQL advisory lock.

Parameters:

  • key (String)

    the lock key

Returns:

  • (Boolean)

    true if released, false if not held



99
100
101
102
103
104
105
106
# File 'lib/kaal/backend/postgres_adapter.rb', line 99

def release(key)
  lock_id = calculate_lock_id(key)

  sql = ActiveRecord::Base.sanitize_sql_array(['SELECT pg_advisory_unlock(?)', lock_id])
  cast_to_boolean(ActiveRecord::Base.connection.execute(sql).first['pg_advisory_unlock'])
rescue StandardError => e
  raise LockAdapterError, "PostgreSQL release failed for #{key}: #{e.message}"
end