Class: SidekiqUniqueJobs::Lock::WhileExecuting
- Inherits:
-
Object
- Object
- SidekiqUniqueJobs::Lock::WhileExecuting
- Defined in:
- lib/sidekiq_unique_jobs/lock/while_executing.rb
Constant Summary collapse
- MUTEX =
Mutex.new
Class Method Summary collapse
Instance Method Summary collapse
- #create_digest ⇒ Object
- #execute(_callback) ⇒ Object
-
#initialize(item, redis_pool = nil) ⇒ WhileExecuting
constructor
A new instance of WhileExecuting.
- #locked? ⇒ Boolean
- #max_lock_time ⇒ Object
- #synchronize ⇒ Object
Constructor Details
#initialize(item, redis_pool = nil) ⇒ WhileExecuting
Returns a new instance of WhileExecuting.
12 13 14 15 16 |
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 12 def initialize(item, redis_pool = nil) @item = item @redis_pool = redis_pool @unique_digest = "#{create_digest}:run" end |
Class Method Details
.synchronize(item, redis_pool = nil) ⇒ Object
8 9 10 |
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 8 def self.synchronize(item, redis_pool = nil) new(item, redis_pool).synchronize { yield } end |
Instance Method Details
#create_digest ⇒ Object
47 48 49 50 |
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 47 def create_digest @unique_digest ||= @item[UNIQUE_DIGEST_KEY] @unique_digest ||= SidekiqUniqueJobs::UniqueArgs.digest(@item) end |
#execute(_callback) ⇒ Object
41 42 43 44 45 |
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 41 def execute(_callback) synchronize do yield end end |
#locked? ⇒ Boolean
31 32 33 34 35 |
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 31 def locked? Scripts.call(:synchronize, @redis_pool, keys: [@unique_digest], argv: [Time.now.to_i, max_lock_time]) == 1 end |
#max_lock_time ⇒ Object
37 38 39 |
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 37 def max_lock_time @max_lock_time ||= RunLockTimeoutCalculator.for_item(@item).seconds end |
#synchronize ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 18 def synchronize MUTEX.lock sleep 0.1 until locked? yield rescue Sidekiq::Shutdown logger.fatal { "the unique_key: #{@unique_digest} needs to be unlocked manually" } raise ensure SidekiqUniqueJobs.connection(@redis_pool) { |conn| conn.del @unique_digest } MUTEX.unlock end |