Class: SidekiqUniqueJobs::Lock::WhileExecuting

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq_unique_jobs/lock/while_executing.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(item, redis_pool = nil) ⇒ WhileExecuting

Returns a new instance of WhileExecuting.



8
9
10
11
12
13
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 8

def initialize(item, redis_pool = nil)
  @item = item
  @mutex = Mutex.new
  @redis_pool = redis_pool
  @unique_digest = "#{create_digest}:run"
end

Class Method Details

.synchronize(item, redis_pool = nil) ⇒ Object



4
5
6
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 4

def self.synchronize(item, redis_pool = nil)
  new(item, redis_pool).synchronize { yield }
end

Instance Method Details

#create_digestObject



44
45
46
47
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 44

def create_digest
  @unique_digest ||= @item[UNIQUE_DIGEST_KEY]
  @unique_digest ||= SidekiqUniqueJobs::UniqueArgs.digest(@item)
end

#execute(_callback) ⇒ Object



38
39
40
41
42
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 38

def execute(_callback)
  synchronize do
    yield
  end
end

#locked?Boolean

Returns:

  • (Boolean)


28
29
30
31
32
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 28

def locked?
  Scripts.call(:synchronize, @redis_pool,
               keys: [@unique_digest],
               argv: [Time.now.to_i, max_lock_time]) == 1
end

#max_lock_timeObject



34
35
36
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 34

def max_lock_time
  @max_lock_time ||= RunLockTimeoutCalculator.for_item(@item).seconds
end

#synchronizeObject



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/sidekiq_unique_jobs/lock/while_executing.rb', line 15

def synchronize
  @mutex.lock
  sleep 0.1 until locked?

  yield
rescue Sidekiq::Shutdown
  logger.fatal { "the unique_key: #{@unique_digest} needs to be unlocked manually" }
  raise
ensure
  SidekiqUniqueJobs.connection(@redis_pool) { |c| c.del @unique_digest }
  @mutex.unlock
end