4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/resque/unique_by_arity/modulizer.rb', line 4
def self.to_mod(configuration)
Module.new do
if configuration.unique_in_queue || configuration.unique_at_runtime || configuration.unique_across_queues
define_method(:redis_unique_hash) do |payload|
payload = Resque.decode(Resque.encode(payload))
Resque::UniqueByArity.unique_debug("payload is #{payload.inspect}")
job = payload['class']
args = payload['args'] || []
args.map! do |arg|
arg.is_a?(Hash) ? arg.sort : arg
end
uniqueness_args = if configuration.arity_for_uniqueness.zero?
[]
else
args[0..(configuration.arity_for_uniqueness - 1)]
end
args = { class: job, args: uniqueness_args }
return [Digest::MD5.hexdigest(Resque.encode(args)), uniqueness_args]
end
end
if configuration.lock_after_execution_period
instance_variable_set(:@lock_after_execution_period, configuration.lock_after_execution_period)
end
if configuration.runtime_lock_timeout
instance_variable_set(:@runtime_lock_timeout, configuration.runtime_lock_timeout)
end
if configuration.runtime_requeue_interval
instance_variable_set(:@runtime_requeue_interval, configuration.runtime_requeue_interval)
end
if configuration.unique_at_runtime_key_base
instance_variable_set(:@unique_at_runtime_key_base, configuration.unique_at_runtime_key_base)
end
if configuration.unique_in_queue_key_base
Resque::UniqueInQueue.uniq_config&.unique_in_queue_key_base = configuration.unique_in_queue_key_base
end
if configuration.unique_in_queue || configuration.unique_across_queues
define_method(:unique_at_runtime_redis_key_prefix) do
"unique_job:#{self}"
end
define_method(:unique_in_queue_redis_key) do |queue, payload|
unique_hash, args_for_uniqueness = redis_unique_hash(payload)
key = "#{unique_at_runtime_key_namespace(queue)}:#{unique_at_runtime_redis_key_prefix}:#{unique_hash}"
Resque::UniqueByArity.unique_debug("#{self}.unique_in_queue_redis_key for #{args_for_uniqueness} is: #{ColorizedString[key].green}")
key
end
define_method(:purge_unique_queued_redis_keys) do
key_match = "#{unique_at_runtime_key_namespace(instance_variable_get(:@queue))}:#{unique_at_runtime_redis_key_prefix}:*"
keys = Resque.redis.keys(key_match)
Resque::UniqueByArity.unique_log("#{Resque::UniqueByArity::PLUGIN_TAG}#{Resque::UniqueInQueue::PLUGIN_TAG} Purging #{keys.length} keys from #{ColorizedString[key_match].red}")
Resque.redis.del keys unless keys.empty?
end
if configuration.unique_in_queue
define_method(:unique_at_runtime_key_namespace) do |queue = nil|
"#{configuration.unique_in_queue_key_base}:queue:#{queue}:job"
end
elsif configuration.unique_across_queues
define_method(:unique_at_runtime_key_namespace) do |_queue = nil|
"#{configuration.unique_in_queue_key_base}:across_queues:job"
end
end
end
if configuration.unique_at_runtime
define_method(:runtime_key_namespace) do
"#{configuration.unique_at_runtime_key_base}:#{self}"
end
define_method(:unique_at_runtime_redis_key) do |*args|
unique_hash, args_for_uniqueness = redis_unique_hash('class' => to_s, 'args' => args)
key = "#{runtime_key_namespace}:#{unique_hash}"
Resque::UniqueByArity.unique_debug("#{ColorizedString['[R-UAR]'].yellow} #{self}.unique_at_runtime_redis_key for #{args_for_uniqueness} is: #{ColorizedString[key].yellow}")
key
end
define_method(:purge_unique_at_runtime_redis_keys) do
key_match = "#{runtime_key_namespace}:*"
keys = Resque.redis.keys(key_match)
Resque::UniqueByArity.unique_log("#{ColorizedString['[R-UBA][R-UAR]'].red} Purging #{keys.length} keys from #{ColorizedString[key_match].red}")
Resque.redis.del keys unless keys.empty?
end
end
end
end
|