Class: Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob

Inherits:
Object
  • Object
show all
Includes:
Utils::StrongMemoize
Defined in:
lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb

Overview

This class defines an identifier of a job in a queue The identifier based on a job's class and arguments.

As strategy decides when to keep track of the job in redis and when to remove it.

Storing the deduplication key in redis can be done by calling `check!` check returns the `jid` of the job if it was scheduled, or the `jid` of the duplicate job if it was already scheduled

When new jobs can be scheduled again, the strategy calls `#delete`.

Constant Summary collapse

DEFAULT_DUPLICATE_KEY_TTL =
6.hours
WAL_LOCATION_TTL =
60.seconds
MAX_REDIS_RETRIES =
5
DEFAULT_STRATEGY =
:until_executing
STRATEGY_NONE =
:none
DEDUPLICATED_FLAG_VALUE =
1
LUA_SET_WAL_SCRIPT =
<<~EOS
  local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3]
  local existing_offset = redis.call("LINDEX", key, -1)
  if existing_offset == false then
    redis.call("RPUSH", key, wal, offset)
    redis.call("EXPIRE", key, ttl)
  elseif offset > tonumber(existing_offset) then
    redis.call("LSET", key, 0, wal)
    redis.call("LSET", key, -1, offset)
  end
EOS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils::StrongMemoize

#clear_memoization, #strong_memoize, #strong_memoized?

Constructor Details

#initialize(job, queue_name) ⇒ DuplicateJob

Returns a new instance of DuplicateJob.


43
44
45
46
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 43

def initialize(job, queue_name)
  @job = job
  @queue_name = queue_name
end

Instance Attribute Details

#existing_jidObject

Returns the value of attribute existing_jid.


41
42
43
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 41

def existing_jid
  @existing_jid
end

Instance Method Details

#check!(expiry = duplicate_key_ttl) ⇒ Object

This method will return the jid that was set in redis


62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 62

def check!(expiry = duplicate_key_ttl)
  read_jid = nil
  read_wal_locations = {}

  Sidekiq.redis do |redis|
    redis.multi do |multi|
      multi.set(idempotency_key, jid, ex: expiry, nx: true)
      read_wal_locations = check_existing_wal_locations!(multi, expiry)
      read_jid = multi.get(idempotency_key)
    end
  end

  job['idempotency_key'] = idempotency_key

  # We need to fetch values since the read_wal_locations and read_jid were obtained inside transaction, under redis.multi command.
  self.existing_wal_locations = read_wal_locations.transform_values(&:value)
  self.existing_jid = read_jid.value
end

#delete!Object


115
116
117
118
119
120
121
122
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 115

def delete!
  Sidekiq.redis do |redis|
    redis.multi do |multi|
      multi.del(idempotency_key, deduplicated_flag_key)
      delete_wal_locations!(multi)
    end
  end
end

#duplicate?Boolean

Returns:

  • (Boolean)

134
135
136
137
138
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 134

def duplicate?
  raise "Call `#check!` first to check for existing duplicates" unless existing_jid

  jid != existing_jid
end

#duplicate_key_ttlObject


174
175
176
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 174

def duplicate_key_ttl
  options[:ttl] || DEFAULT_DUPLICATE_KEY_TTL
end

#idempotent?Boolean

Returns:

  • (Boolean)

167
168
169
170
171
172
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 167

def idempotent?
  return false unless worker_klass
  return false unless worker_klass.respond_to?(:idempotent?)

  worker_klass.idempotent?
end

#latest_wal_locationsObject


97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 97

def latest_wal_locations
  return {} unless job_wal_locations.present?

  strong_memoize(:latest_wal_locations) do
    read_wal_locations = {}

    Sidekiq.redis do |redis|
      redis.multi do |multi|
        job_wal_locations.keys.each do |connection_name|
          read_wal_locations[connection_name] = multi.lindex(wal_location_key(connection_name), 0)
        end
      end
    end

    read_wal_locations.transform_values(&:value).compact
  end
end

#optionsObject


160
161
162
163
164
165
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 160

def options
  return {} unless worker_klass
  return {} unless worker_klass.respond_to?(:get_deduplication_options)

  worker_klass.get_deduplication_options
end

#perform(&block) ⇒ Object

This will continue the server middleware chain if the job should be executed. It will return false if the job should not be executed.


57
58
59
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 57

def perform(&block)
  Strategies.for(strategy).new(self).perform(job, &block)
end

#rescheduleObject


124
125
126
127
128
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 124

def reschedule
  Gitlab::SidekiqLogging::DeduplicationLogger.instance.rescheduled_log(job)

  worker_klass.perform_async(*arguments)
end

#schedule(&block) ⇒ Object

This will continue the middleware chain if the job should be scheduled It will return false if the job needs to be cancelled


50
51
52
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 50

def schedule(&block)
  Strategies.for(strategy).new(self).schedule(job, &block)
end

#scheduled?Boolean

Returns:

  • (Boolean)

130
131
132
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 130

def scheduled?
  scheduled_at.present?
end

#scheduled_atObject


156
157
158
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 156

def scheduled_at
  job['at']
end

#set_deduplicated_flag!(expiry = duplicate_key_ttl) ⇒ Object


140
141
142
143
144
145
146
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 140

def set_deduplicated_flag!(expiry = duplicate_key_ttl)
  return unless reschedulable?

  Sidekiq.redis do |redis|
    redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true)
  end
end

#should_reschedule?Boolean

Returns:

  • (Boolean)

148
149
150
151
152
153
154
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 148

def should_reschedule?
  return false unless reschedulable?

  Sidekiq.redis do |redis|
    redis.get(deduplicated_flag_key).present?
  end
end

#update_latest_wal_location!Object


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 81

def update_latest_wal_location!
  return unless job_wal_locations.present?

  Sidekiq.redis do |redis|
    redis.multi do |multi|
      job_wal_locations.each do |connection_name, location|
        multi.eval(
          LUA_SET_WAL_SCRIPT,
          keys: [wal_location_key(connection_name)],
          argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]
        )
      end
    end
  end
end