Module: Resque::Single::ClassMethods

Defined in:
lib/resque/single.rb

Instance Method Summary collapse

Instance Method Details

#after_dequeue_lock(*args) ⇒ Object

When job is dequeued we should remove lock



90
91
92
# File 'lib/resque/single.rb', line 90

def after_dequeue_lock(*args)
  unlock(*args) if args.any?
end

#after_dequeue_meta(*args) ⇒ Object

Fail metadata if dequeue succeed



95
96
97
98
99
# File 'lib/resque/single.rb', line 95

def after_dequeue_meta(*args)
  if (meta_id = args.first) && (meta = get_meta(meta_id))
    meta.fail!
  end
end

#before_dequeue_lock(*args) ⇒ Object

Before dequeue check if job is running



83
84
85
86
87
# File 'lib/resque/single.rb', line 83

def before_dequeue_lock(*args)
  (meta_id = args.first) &&
  (meta = get_meta(meta_id)) &&
  !meta.working?
end

#dequeue(*args) ⇒ Object

Dequeue unique job



120
121
122
# File 'lib/resque/single.rb', line 120

def dequeue(*args)
  Resque.dequeue(self, meta_id(*args), *args)
end

#enqueue_with_check(*args) ⇒ Object

Overriding enqueue method here so now it returns existing metadata if job already queued



125
126
127
128
129
130
# File 'lib/resque/single.rb', line 125

def enqueue_with_check(*args) #:nodoc:
  meta = enqueued?(*args) and return meta

  # enqueue job and retrieve its meta
  enqueue_without_check(*args)
end

#enqueued?(*args) ⇒ Boolean

Is job already in queue or in process?

Returns:

  • (Boolean)


102
103
104
105
106
107
108
109
# File 'lib/resque/single.rb', line 102

def enqueued?(*args)
  # if lock exists and timeout not exceeded
  if locked?(*args)
    get_meta(meta_id(*args))
  else
    nil
  end
end

#executeObject

Raises:

  • (NotImplementedError)


73
74
75
# File 'lib/resque/single.rb', line 73

def execute(*)
  raise NotImplementedError, "You should implement `execute' method"
end

#lock(meta_id, *args) ⇒ Object

LockID should be independent from MetaID



54
55
56
# File 'lib/resque/single.rb', line 54

def lock(meta_id, *args)
  "lock:#{name}-#{Digest::SHA1.hexdigest(obj_to_string(lock_on[*args]))}"
end

#lock_on(&block) ⇒ Object

Get or set proc returning unique arguments



45
46
47
48
49
50
51
# File 'lib/resque/single.rb', line 45

def lock_on(&block)
  if block_given?
    @unique = block
  else
    @unique ||= proc { |*args| args }
  end
end

#locked?(*args) ⇒ Boolean

Returns true if resque job is in locked state

Returns:

  • (Boolean)


112
113
114
115
116
117
# File 'lib/resque/single.rb', line 112

def locked?(*args)
  key = lock(nil, *args)
  now = Time.now.to_i

  Resque.redis.exists(key) && now <= Resque.redis.get(key).to_i
end

#metaObject

get meta object associated with job



64
65
66
# File 'lib/resque/single.rb', line 64

def meta
  get_meta(@meta_id)
end

#meta_id(*args) ⇒ Object

Overriding meta_id here so now it generates the same MetaID for Jobs with same args



59
60
61
# File 'lib/resque/single.rb', line 59

def meta_id(*args)
  Digest::SHA1.hexdigest(['resque-unique', self, lock_on[*args]].join)
end

#on_failure_lock(e, *args) ⇒ Object

When job is failed we should remove lock



78
79
80
# File 'lib/resque/single.rb', line 78

def on_failure_lock(e, *args)
  unlock(*args)
end

#perform(meta_id, *args) ⇒ Object

default perform method override



69
70
71
# File 'lib/resque/single.rb', line 69

def perform(meta_id, *args)
  execute(*args)
end

#retry_args(*args) ⇒ Object



39
40
41
42
# File 'lib/resque/single.rb', line 39

def retry_args(*args)
  args.shift
  args
end

#scheduled(queue, klass, *args) ⇒ Object



35
36
37
# File 'lib/resque/single.rb', line 35

def scheduled(queue, klass, *args)
  klass.constantize.enqueue(*args)
end