Module: Resque::Integration::Unique

Defined in:
lib/resque/integration/unique.rb

Overview

Unique job

Examples:

class MyJob
  include Resque::Integration

  # jobs are considered as equal if their first argument is the same
  unique { |*args| args.first }

  def self.execute(image_id)
    # do it
  end
end

MyJob.enqueue(11)

Defined Under Namespace

Modules: Overrides

Constant Summary collapse

LOCK_TIMEOUT =

3 days

259_200

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.extended(base) ⇒ Object



26
27
28
29
30
31
32
33
# File 'lib/resque/integration/unique.rb', line 26

def self.extended(base)
  if base.singleton_class.include?(::Resque::Integration::Priority)
    raise 'Uniqueness should be enabled before Prioritness'
  end

  base.extend(::Resque::Plugins::Progress)
  base.singleton_class.prepend(Overrides)
end

Instance Method Details

#after_dequeue_lock(_meta_id, *args) ⇒ Object

When job is dequeued we should remove lock



146
147
148
# File 'lib/resque/integration/unique.rb', line 146

def after_dequeue_lock(_meta_id, *args)
  unlock(*args) unless args.empty?
end

#after_dequeue_meta(*args) ⇒ Object

Fail metadata if dequeue succeed



151
152
153
154
155
# File 'lib/resque/integration/unique.rb', line 151

def after_dequeue_meta(*args)
  if (meta_id = args.first) && (meta = get_meta(meta_id))
    meta.fail!
  end
end

#around_perform_lock(_meta_id, *args) ⇒ Object



138
139
140
141
142
143
# File 'lib/resque/integration/unique.rb', line 138

def around_perform_lock(_meta_id, *args)
  yield
ensure
  # Always clear the lock when we're done, even if there is an error.
  unlock(*args)
end

#before_dequeue_lock(*args) ⇒ Object

Before dequeue check if job is running



116
117
118
119
120
# File 'lib/resque/integration/unique.rb', line 116

def before_dequeue_lock(*args)
  (meta_id = args.first) &&
  (meta = get_meta(meta_id)) &&
  !meta.working?
end

#before_enqueue_lock(_meta_id, *args) ⇒ Object

Before enqueue acquire a lock

Returns boolean



134
135
136
# File 'lib/resque/integration/unique.rb', line 134

def before_enqueue_lock(_meta_id, *args)
  ::Resque.redis.set(lock_id(*args), 1, ex: lock_timeout, nx: true)
end

#dequeue(*args) ⇒ Object

Dequeue unique job



173
174
175
# File 'lib/resque/integration/unique.rb', line 173

def dequeue(*args)
  ::Resque.dequeue(self, meta_id(*args), *args)
end

#enqueue_to(queue, *args) ⇒ Object



177
178
179
180
181
182
183
184
185
186
# File 'lib/resque/integration/unique.rb', line 177

def enqueue_to(queue, *args)
  meta = enqueued?(*args)
  return meta if meta.present?

  meta = ::Resque::Plugins::Meta::Metadata.new('meta_id' => meta_id(args), 'job_class' => to_s)
  meta.save

  ::Resque.enqueue_to(queue, self, meta.meta_id, *args)
  meta
end

#enqueued?(*args) ⇒ Boolean

Is job already in queue or in process?

Returns:

  • (Boolean)


158
159
160
161
# File 'lib/resque/integration/unique.rb', line 158

def enqueued?(*args)
  # if lock exists and timeout not exceeded
  get_meta(meta_id(*args)) if locked?(*args)
end

#executeObject

Raises:

  • (NotImplementedError)


106
107
108
# File 'lib/resque/integration/unique.rb', line 106

def execute(*)
  raise NotImplementedError, "You should implement `execute' method"
end

#lock_id(*args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

LockID should be independent from MetaID



89
90
91
92
93
94
# File 'lib/resque/integration/unique.rb', line 89

def lock_id(*args)
  args = args.map { |i| i.is_a?(Hash) ? i.with_indifferent_access : i }
  locked_args = lock_on.call(*args)
  encoded_args = ::Digest::SHA1.hexdigest(obj_to_string(locked_args))
  "lock:#{name}-#{encoded_args}"
end

#lock_on(&block) ⇒ Object

Get or set proc returning unique arguments



79
80
81
82
83
84
85
# File 'lib/resque/integration/unique.rb', line 79

def lock_on(&block)
  if block_given?
    @unique = block
  else
    @unique ||= proc { |*args| args }
  end
end

#lock_timeoutObject



163
164
165
# File 'lib/resque/integration/unique.rb', line 163

def lock_timeout
  LOCK_TIMEOUT
end

#locked?(*args) ⇒ Boolean

Returns true if resque job is in locked state

Returns:

  • (Boolean)


168
169
170
# File 'lib/resque/integration/unique.rb', line 168

def locked?(*args)
  ::Resque.redis.exists(lock_id(*args))
end

#metaObject

get meta object associated with job



97
98
99
# File 'lib/resque/integration/unique.rb', line 97

def meta
  get_meta(@meta_id)
end

#on_failure_lock(_e, _meta_id, *args) ⇒ Object

When job is failed we should remove lock



111
112
113
# File 'lib/resque/integration/unique.rb', line 111

def on_failure_lock(_e, _meta_id, *args)
  unlock(*args)
end

#on_failure_retry(exception, *args) ⇒ Object



122
123
124
125
126
127
128
129
# File 'lib/resque/integration/unique.rb', line 122

def on_failure_retry(exception, *args)
  return unless defined?(super)

  # Keep meta_id if kill -9 (or ABRT)
  @meta_id = args.first if exception.is_a?(::Resque::DirtyExit)

  super
end

#perform(meta_id, *args) ⇒ Object

default ‘perform` method override



102
103
104
# File 'lib/resque/integration/unique.rb', line 102

def perform(meta_id, *args)
  execute(*args)
end

#retry_args(meta_id, *args) ⇒ Object

Метод вызывает resque-retry когда ставить отложенное задание здесь мы убираем meta_id из аргументов



63
64
65
# File 'lib/resque/integration/unique.rb', line 63

def retry_args(meta_id, *args)
  args
end

#retry_identifier(*args) ⇒ Object

Метод вызывает resque-retry, когда записывает/читает число перезапусков

- во время работы воркера первым аргументом передается meta_id;
- во время чтения из вебинтерфейса, meta_id не передается, т.к. она выкидывается во время перепостановки
джоба(см retry_args);
- если метод вызывается в пользовательском коде(и @meta_id отсутствует), то meta_id нельзя передавать.


72
73
74
75
76
# File 'lib/resque/integration/unique.rb', line 72

def retry_identifier(*args)
  return if args.empty?
  args.shift if @meta_id.is_a?(String) && !@meta_id.empty? && @meta_id == args.first
  lock_id(*args)
end

#scheduled(queue, klass, *args) ⇒ Object

Метод вызывает resque-scheduler чтобы поставить задание в текущую очередь



57
58
59
# File 'lib/resque/integration/unique.rb', line 57

def scheduled(queue, klass, *args)
  klass.constantize.enqueue_to(queue, *args)
end

#unique?Boolean

Returns true because job is unique now

Returns:

  • (Boolean)


52
53
54
# File 'lib/resque/integration/unique.rb', line 52

def unique?
  true
end