Class: Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob
- Inherits:
-
Object
- Object
- Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob
show all
- Includes:
- Utils::StrongMemoize
- Defined in:
- lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb
Overview
This class defines an identifier of a job in a queue The identifier based on a job's class and arguments.
As strategy decides when to keep track of the job in redis and when to remove it.
Storing the deduplication key in redis can be done by calling `check!` check returns the `jid` of the job if it was scheduled, or the `jid` of the duplicate job if it was already scheduled
When new jobs can be scheduled again, the strategy calls `#delete`.
Constant Summary
collapse
- DEFAULT_DUPLICATE_KEY_TTL =
6.hours
- WAL_LOCATION_TTL =
60.seconds
- MAX_REDIS_RETRIES =
5
- DEFAULT_STRATEGY =
:until_executing
- STRATEGY_NONE =
:none
- DEDUPLICATED_FLAG_VALUE =
1
- LUA_SET_WAL_SCRIPT =
<<~EOS
local key, wal, offset, ttl = KEYS[1], ARGV[1], tonumber(ARGV[2]), ARGV[3]
local existing_offset = redis.call("LINDEX", key, -1)
if existing_offset == false then
redis.call("RPUSH", key, wal, offset)
redis.call("EXPIRE", key, ttl)
elseif offset > tonumber(existing_offset) then
redis.call("LSET", key, 0, wal)
redis.call("LSET", key, -1, offset)
end
EOS
Instance Attribute Summary collapse
Instance Method Summary
collapse
#clear_memoization, #strong_memoize, #strong_memoized?
Constructor Details
#initialize(job, queue_name) ⇒ DuplicateJob
Returns a new instance of DuplicateJob.
43
44
45
46
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 43
def initialize(job, queue_name)
@job = job
@queue_name = queue_name
end
|
Instance Attribute Details
#existing_jid ⇒ Object
Returns the value of attribute existing_jid.
41
42
43
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 41
def existing_jid
@existing_jid
end
|
Instance Method Details
#check!(expiry = duplicate_key_ttl) ⇒ Object
This method will return the jid that was set in redis
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 62
def check!(expiry = duplicate_key_ttl)
read_jid = nil
read_wal_locations = {}
Sidekiq.redis do |redis|
redis.multi do |multi|
multi.set(idempotency_key, jid, ex: expiry, nx: true)
read_wal_locations = check_existing_wal_locations!(multi, expiry)
read_jid = multi.get(idempotency_key)
end
end
job['idempotency_key'] = idempotency_key
self.existing_wal_locations = read_wal_locations.transform_values(&:value)
self.existing_jid = read_jid.value
end
|
#delete! ⇒ Object
115
116
117
118
119
120
121
122
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 115
def delete!
Sidekiq.redis do |redis|
redis.multi do |multi|
multi.del(idempotency_key, deduplicated_flag_key)
delete_wal_locations!(multi)
end
end
end
|
#duplicate? ⇒ Boolean
134
135
136
137
138
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 134
def duplicate?
raise "Call `#check!` first to check for existing duplicates" unless existing_jid
jid != existing_jid
end
|
#duplicate_key_ttl ⇒ Object
174
175
176
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 174
def duplicate_key_ttl
options[:ttl] || DEFAULT_DUPLICATE_KEY_TTL
end
|
#idempotent? ⇒ Boolean
167
168
169
170
171
172
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 167
def idempotent?
return false unless worker_klass
return false unless worker_klass.respond_to?(:idempotent?)
worker_klass.idempotent?
end
|
#latest_wal_locations ⇒ Object
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 97
def latest_wal_locations
return {} unless job_wal_locations.present?
strong_memoize(:latest_wal_locations) do
read_wal_locations = {}
Sidekiq.redis do |redis|
redis.multi do |multi|
job_wal_locations.keys.each do |connection_name|
read_wal_locations[connection_name] = multi.lindex(wal_location_key(connection_name), 0)
end
end
end
read_wal_locations.transform_values(&:value).compact
end
end
|
#options ⇒ Object
160
161
162
163
164
165
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 160
def options
return {} unless worker_klass
return {} unless worker_klass.respond_to?(:get_deduplication_options)
worker_klass.get_deduplication_options
end
|
This will continue the server middleware chain if the job should be executed. It will return false if the job should not be executed.
57
58
59
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 57
def perform(&block)
Strategies.for(strategy).new(self).perform(job, &block)
end
|
#reschedule ⇒ Object
124
125
126
127
128
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 124
def reschedule
Gitlab::SidekiqLogging::DeduplicationLogger.instance.rescheduled_log(job)
worker_klass.perform_async(*arguments)
end
|
#schedule(&block) ⇒ Object
This will continue the middleware chain if the job should be scheduled It will return false if the job needs to be cancelled
50
51
52
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 50
def schedule(&block)
Strategies.for(strategy).new(self).schedule(job, &block)
end
|
#scheduled? ⇒ Boolean
130
131
132
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 130
def scheduled?
scheduled_at.present?
end
|
#scheduled_at ⇒ Object
156
157
158
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 156
def scheduled_at
job['at']
end
|
#set_deduplicated_flag!(expiry = duplicate_key_ttl) ⇒ Object
140
141
142
143
144
145
146
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 140
def set_deduplicated_flag!(expiry = duplicate_key_ttl)
return unless reschedulable?
Sidekiq.redis do |redis|
redis.set(deduplicated_flag_key, DEDUPLICATED_FLAG_VALUE, ex: expiry, nx: true)
end
end
|
#should_reschedule? ⇒ Boolean
148
149
150
151
152
153
154
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 148
def should_reschedule?
return false unless reschedulable?
Sidekiq.redis do |redis|
redis.get(deduplicated_flag_key).present?
end
end
|
#update_latest_wal_location! ⇒ Object
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job.rb', line 81
def update_latest_wal_location!
return unless job_wal_locations.present?
Sidekiq.redis do |redis|
redis.multi do |multi|
job_wal_locations.each do |connection_name, location|
multi.eval(
LUA_SET_WAL_SCRIPT,
keys: [wal_location_key(connection_name)],
argv: [location, pg_wal_lsn_diff(connection_name).to_i, WAL_LOCATION_TTL]
)
end
end
end
end
|