Class: SidekiqUniqueJobs::Lock::WhileExecuting

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq_unique_jobs/lock/while_executing.rb

Constant Summary collapse

MUTEX =
Mutex.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(item, redis_pool = nil) ⇒ WhileExecuting

Returns a new instance of WhileExecuting.



12
13
14
15
16
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 12

def initialize(item, redis_pool = nil)
  @item = item
  @redis_pool = redis_pool
  @unique_digest = "#{create_digest}:run"
end

Class Method Details

.synchronize(item, redis_pool = nil) ⇒ Object



8
9
10
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 8

def self.synchronize(item, redis_pool = nil)
  new(item, redis_pool).synchronize { yield }
end

Instance Method Details

#create_digestObject



47
48
49
50
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 47

def create_digest
  @unique_digest ||= @item[UNIQUE_DIGEST_KEY]
  @unique_digest ||= SidekiqUniqueJobs::UniqueArgs.digest(@item)
end

#execute(_callback) ⇒ Object



41
42
43
44
45
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 41

def execute(_callback)
  synchronize do
    yield
  end
end

#locked?Boolean

Returns:

  • (Boolean)


31
32
33
34
35
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 31

def locked?
  Scripts.call(:synchronize, @redis_pool,
               keys: [@unique_digest],
               argv: [Time.now.to_i, max_lock_time]) == 1
end

#max_lock_timeObject



37
38
39
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 37

def max_lock_time
  @max_lock_time ||= RunLockTimeoutCalculator.for_item(@item).seconds
end

#synchronizeObject



18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 18

def synchronize
  MUTEX.lock
  sleep 0.1 until locked?

  yield
rescue Sidekiq::Shutdown
  logger.fatal { "the unique_key: #{@unique_digest} needs to be unlocked manually" }
  raise
ensure
  SidekiqUniqueJobs.connection(@redis_pool) { |conn| conn.del @unique_digest }
  MUTEX.unlock
end