Class: SidekiqUniqueJobs::Lock::WhileExecuting

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq_unique_jobs/lock/while_executing.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(item, redis_pool = nil) ⇒ WhileExecuting

Returns a new instance of WhileExecuting.


10
11
12
13
14
15
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 10

def initialize(item, redis_pool = nil)
  @item = item
  @redis_pool = redis_pool
  @unique_digest = "#{create_digest}:run"
  @mutex = Mutex.new
end

Class Method Details

.synchronize(item, redis_pool = nil) ⇒ Object


6
7
8
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 6

def self.synchronize(item, redis_pool = nil)
  new(item, redis_pool).synchronize { yield }
end

Instance Method Details

#create_digestObject


45
46
47
48
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 45

def create_digest
  @create_digest ||= @item[UNIQUE_DIGEST_KEY]
  @create_digest ||= SidekiqUniqueJobs::UniqueArgs.digest(@item)
end

#execute(_callback) ⇒ Object


39
40
41
42
43
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 39

def execute(_callback)
  synchronize do
    yield
  end
end

#locked?Boolean

Returns:

  • (Boolean)

29
30
31
32
33
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 29

def locked?
  Scripts.call(:synchronize, @redis_pool,
               keys: [@unique_digest],
               argv: [Time.now.to_i, max_lock_time]) == 1
end

#max_lock_timeObject


35
36
37
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 35

def max_lock_time
  @max_lock_time ||= RunLockTimeoutCalculator.for_item(@item).seconds
end

#synchronizeObject


17
18
19
20
21
22
23
24
25
26
27
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 17

def synchronize
  @mutex.synchronize do
    sleep 0.1 until locked?
    yield
  end
rescue Sidekiq::Shutdown
  logger.fatal { "the unique_key: #{@unique_digest} needs to be unlocked manually" }
  raise
ensure
  SidekiqUniqueJobs.connection(@redis_pool) { |conn| conn.del @unique_digest }
end