Module: Amigo::SemaphoreBackoffJob::PrependedMethods

Defined in:
lib/amigo/semaphore_backoff_job.rb

Instance Method Summary collapse

Instance Method Details

#perform(*args) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/amigo/semaphore_backoff_job.rb', line 94

def perform(*args)
  self.before_perform(*args) if self.respond_to?(:before_perform)
  return super unless ::Amigo::SemaphoreBackoffJob.enabled?
  key = self.semaphore_key
  size = self.semaphore_size
  # Create a simple counter for the semaphore key.
  # Always increment; also set an expiration if this is the first job.
  # If we need to retry later, make sure we decrement, then schedule for the future.
  # If we run it now, decrement the counter afterwards.
  # If some corruption results in a negative number of jobs in the semaphore,
  # we can delete the key and get back to a default state
  # (this can cause problems but the idea is that
  # we should run at least the configured number of jobs,
  # and eventually the semaphore key will expire/get rebalanced).
  jobs_in_semaphore = Sidekiq.redis do |conn|
    cnt = conn.incr(key)
    conn.expire(key, self.semaphore_expiry) if cnt == 1
    cnt
  end
  if jobs_in_semaphore > size
    Sidekiq.redis { |conn| conn.decr(key) }
    backoff = self.semaphore_backoff
    self.class.perform_in(backoff, *args)
    return
  end
  begin
    super
  ensure
    Sidekiq.redis do |conn|
      new_job_count = conn.decr(key)
      if new_job_count.negative?
        conn.del(key)
        Sidekiq.logger.warn("negative_semaphore_backoff_job_count")
      end
    end
  end
end