Module: Resque::UniqueInQueue::Queue
- Defined in:
- lib/resque/unique_in_queue/queue.rb
Class Method Summary collapse
- .cleanup(queue) ⇒ Object
- .const_for(item) ⇒ Object
- .destroy(queue, klass, *args) ⇒ Object
- .is_unique?(item) ⇒ Boolean
- .item_class(item) ⇒ Object
- .item_ttl(item) ⇒ Object
- .lock_after_execution_period(item) ⇒ Object
- .mark_queued(queue, item) ⇒ Object
- .mark_unqueued(queue, job) ⇒ Object
- .queued?(queue, item) ⇒ Boolean
- .redis ⇒ Object
- .unique_key(queue, item) ⇒ Object
Class Method Details
.cleanup(queue) ⇒ Object
66 67 68 69 |
# File 'lib/resque/unique_in_queue/queue.rb', line 66 def cleanup(queue) keys = redis.keys("#{Resque::UniqueInQueue.uniq_config&.unique_in_queue_key_base}:queue:#{queue}:job:*") redis.del(*keys) if keys.any? end |
.const_for(item) ⇒ Object
81 82 83 |
# File 'lib/resque/unique_in_queue/queue.rb', line 81 def const_for(item) Resque.constantize(item_class(item)) end |
.destroy(queue, klass, *args) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/resque/unique_in_queue/queue.rb', line 53 def destroy(queue, klass, *args) klass = klass.to_s redis_queue = "queue:#{queue}" redis.lrange(redis_queue, 0, -1).each do |string| json = Resque.decode(string) next unless json['class'] == klass next if args.any? && json['args'] != args Resque::UniqueInQueue::Queue.mark_unqueued(queue, json) end end |
.is_unique?(item) ⇒ Boolean
35 36 37 38 39 |
# File 'lib/resque/unique_in_queue/queue.rb', line 35 def is_unique?(item) const_for(item).included_modules.include?(::Resque::Plugins::UniqueInQueue) rescue NameError false end |
.item_class(item) ⇒ Object
77 78 79 |
# File 'lib/resque/unique_in_queue/queue.rb', line 77 def item_class(item) item[:class] || item['class'] end |
.item_ttl(item) ⇒ Object
41 42 43 44 45 |
# File 'lib/resque/unique_in_queue/queue.rb', line 41 def item_ttl(item) const_for(item).ttl rescue NameError -1 end |
.lock_after_execution_period(item) ⇒ Object
47 48 49 50 51 |
# File 'lib/resque/unique_in_queue/queue.rb', line 47 def lock_after_execution_period(item) const_for(item).lock_after_execution_period rescue NameError 0 end |
.mark_queued(queue, item) ⇒ Object
10 11 12 13 14 15 16 17 |
# File 'lib/resque/unique_in_queue/queue.rb', line 10 def mark_queued(queue, item) return unless is_unique?(item) key = unique_key(queue, item) redis.set(key, 1) ttl = item_ttl(item) redis.expire(key, ttl) if ttl >= 0 end |
.mark_unqueued(queue, job) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/resque/unique_in_queue/queue.rb', line 19 def mark_unqueued(queue, job) item = job.is_a?(Resque::Job) ? job.payload : job return unless is_unique?(item) ttl = lock_after_execution_period(item) if ttl == 0 redis.del(unique_key(queue, item)) else redis.expire(unique_key(queue, item), ttl) end end |
.queued?(queue, item) ⇒ Boolean
4 5 6 7 8 |
# File 'lib/resque/unique_in_queue/queue.rb', line 4 def queued?(queue, item) return false unless is_unique?(item) redis.get(unique_key(queue, item)) == '1' end |
.redis ⇒ Object
73 74 75 |
# File 'lib/resque/unique_in_queue/queue.rb', line 73 def redis Resque.redis end |
.unique_key(queue, item) ⇒ Object
31 32 33 |
# File 'lib/resque/unique_in_queue/queue.rb', line 31 def unique_key(queue, item) const_for(item).unique_in_queue_redis_key(queue, item) end |