Class: ChronoForge::Executor::LockStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/chrono_forge/executor/lock_strategy.rb

Class Method Summary collapse

Class Method Details

.acquire_lock(job_id, workflow, max_duration:) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/chrono_forge/executor/lock_strategy.rb', line 9

def acquire_lock(job_id, workflow, max_duration:)
  ActiveRecord::Base.transaction do
    # Find the workflow with a lock, considering stale locks
    workflow = workflow.lock!

    ensure_executable!(workflow)

    # Check for active execution
    if workflow.locked_at && workflow.locked_at > max_duration.ago
      raise ConcurrentExecutionError,
        "ChronoForge:#{self.class}(#{key}) job(#{job_id}) failed to acquire lock. " \
        "Currently being executed by job(#{workflow.locked_by})"
    end

    # Atomic update of lock status
    workflow.update_columns(
      locked_by: job_id,
      locked_at: Time.current,
      state: :running
    )

    Rails.logger.debug { "ChronoForge:#{self.class}(#{workflow.key}) job(#{job_id}) acquired lock." }

    workflow
  end
end

.release_lock(job_id, workflow, force: false) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/chrono_forge/executor/lock_strategy.rb', line 36

def release_lock(job_id, workflow, force: false)
  workflow = workflow.reload
  if !force && workflow.locked_by != job_id
    raise LongRunningConcurrentExecutionError,
      "ChronoForge:#{self.class}(#{workflow.key}) job(#{job_id}) executed longer than specified max_duration, " \
      "allowed job(#{workflow.locked_by}) to acquire the lock."
  end

  columns = {locked_at: nil, locked_by: nil}
  columns[:state] = :idle if force || workflow.running?

  workflow.update_columns(columns)
end