Class: ChronoForge::Executor::LockStrategy
- Inherits:
-
Object
- Object
- ChronoForge::Executor::LockStrategy
- Defined in:
- lib/chrono_forge/executor/lock_strategy.rb
Class Method Summary collapse
- .acquire_lock(job_id, workflow, max_duration:) ⇒ Object
- .release_lock(job_id, workflow, force: false) ⇒ Object
Class Method Details
.acquire_lock(job_id, workflow, max_duration:) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/chrono_forge/executor/lock_strategy.rb', line 9 def acquire_lock(job_id, workflow, max_duration:) ActiveRecord::Base.transaction do # Find the workflow with a lock, considering stale locks workflow = workflow.lock! ensure_executable!(workflow) # Check for active execution if workflow.locked_at && workflow.locked_at > max_duration.ago raise ConcurrentExecutionError, "ChronoForge:#{self.class}(#{key}) job(#{job_id}) failed to acquire lock. " \ "Currently being executed by job(#{workflow.locked_by})" end # Atomic update of lock status workflow.update_columns( locked_by: job_id, locked_at: Time.current, state: :running ) Rails.logger.debug { "ChronoForge:#{self.class}(#{workflow.key}) job(#{job_id}) acquired lock." } workflow end end |
.release_lock(job_id, workflow, force: false) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/chrono_forge/executor/lock_strategy.rb', line 36 def release_lock(job_id, workflow, force: false) workflow = workflow.reload if !force && workflow.locked_by != job_id raise LongRunningConcurrentExecutionError, "ChronoForge:#{self.class}(#{workflow.key}) job(#{job_id}) executed longer than specified max_duration, " \ "allowed job(#{workflow.locked_by}) to acquire the lock." end columns = {locked_at: nil, locked_by: nil} columns[:state] = :idle if force || workflow.running? workflow.update_columns(columns) end |