4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/resque/unique_by_arity/cop_modulizer.rb', line 4
def self.to_mod(configuration)
Module.new do
if configuration.unique_in_queue || configuration.unique_at_runtime || configuration.unique_across_queues
define_method(:redis_unique_hash) do |payload|
payload = Resque.decode(Resque.encode(payload))
Resque::UniqueByArity.unique_log "payload is #{payload.inspect}" if ENV['RESQUE_DEBUG'] == 'true'
job = payload["class"]
args = payload["args"] || []
args.map! do |arg|
arg.is_a?(Hash) ? arg.sort : arg
end
uniqueness_args = args[0..(configuration.arity_for_uniqueness - 1)]
args = { class: job, args: uniqueness_args }
return [Digest::MD5.hexdigest(Resque.encode(args)), uniqueness_args]
end
end
if configuration.lock_after_execution_period
self.instance_variable_set(:@lock_after_execution_period, configuration.lock_after_execution_period)
end
if configuration.runtime_lock_timeout
self.instance_variable_set(:@runtime_lock_timeout, configuration.runtime_lock_timeout)
end
if configuration.unique_in_queue || configuration.unique_across_queues
define_method(:solo_redis_key_prefix) do
"unique_job:#{self}"
end
define_method(:unique_at_queue_time_redis_key) do |queue, payload|
unique_hash, args_for_uniqueness = redis_unique_hash(payload)
key = "#{solo_key_namespace(queue)}:#{solo_redis_key_prefix}:#{unique_hash}"
Resque::UniqueByArity.unique_log "#{ColorizedString['[Queue-Time]'].green} #{self}.unique_at_queue_time_redis_key for #{args_for_uniqueness} is: #{ColorizedString[key].green}" if ENV['RESQUE_DEBUG'] == 'true'
key
end
define_method(:purge_unique_queued_redis_keys) do
key_match = "#{solo_key_namespace(self.instance_variable_get(:@queue))}:#{solo_redis_key_prefix}:*"
keys = Resque.redis.keys(key_match)
Resque::UniqueByArity.unique_log "Purging #{keys.length} keys from #{ColorizedString[key_match].red}"
Resque.redis.del keys if keys.length > 0
end
if configuration.unique_in_queue
define_method(:solo_key_namespace) do |queue = nil|
"solo:queue:#{queue}:job"
end
elsif configuration.unique_across_queues
define_method(:solo_key_namespace) do |_queue = nil|
"solo:across_queues:job"
end
end
end
if configuration.unique_at_runtime
define_method(:runtime_key_namespace) do
puts "runtime_key_namespace for #{self}"
"unique_at_runtime:#{self}"
end
define_method(:unique_at_runtime_redis_key) do |*args|
unique_hash, args_for_uniqueness = redis_unique_hash({"class" => self.to_s, "args" => args})
key = "#{runtime_key_namespace}:#{unique_hash}"
Resque::UniqueByArity.unique_log "#{ColorizedString['[Run-Time]'].yellow} #{self}.unique_at_runtime_redis_key for #{args_for_uniqueness} is: #{ColorizedString[key].yellow}" if ENV['RESQUE_DEBUG'] == 'true'
key
end
define_method(:purge_unique_at_runtime_redis_keys) do
key_match = "#{runtime_key_namespace}:*"
keys = Resque.redis.keys(key_match)
Resque::UniqueByArity.unique_log "Purging #{keys.length} keys from #{ColorizedString[key_match].red}"
Resque.redis.del keys if keys.length > 0
end
end
end
end
|