Module: Resque::Plugins::RateLimited

Included in:
BaseApiQueue
Defined in:
lib/resque/plugins/rate_limited/rate_limited.rb,
lib/resque/plugins/rate_limited/apis/twitter_queue.rb,
lib/resque/plugins/rate_limited/apis/base_api_queue.rb,
lib/resque/plugins/rate_limited/apis/evernote_queue.rb,
lib/resque/plugins/rate_limited/apis/angellist_queue.rb,
lib/resque/plugins/rate_limited/rate_limited_un_pause.rb

Defined Under Namespace

Classes: AngellistQueue, BaseApiQueue, EvernoteQueue, TwitterQueue, UnPause

Constant Summary collapse

RESQUE_PREFIX =
'queue:'.freeze
MUTEX =
'Resque::Plugins::RateLimited'.freeze

Instance Method Summary collapse

Instance Method Details

#around_perform_with_check_and_requeue(*params) ⇒ Object



7
8
9
10
11
12
13
14
15
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 7

def around_perform_with_check_and_requeue(*params)
  paused = false
  with_lock do
    paused = paused?
    Resque.enqueue_to(paused_queue_name, self, *params) if paused
  end
  return if paused
  yield
end

#find_class(klass) ⇒ Object



83
84
85
86
87
88
89
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 83

def find_class(klass)
  return klass if klass.is_a? Class
  return Object.const_get(klass) unless klass.include?('::')
  klass.split('::').reduce(Object) do |mod, class_name|
    mod.const_get(class_name)
  end
end

#pauseObject



50
51
52
53
54
55
56
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 50

def pause
  Resque.redis.renamenx(RESQUE_PREFIX + @queue.to_s, RESQUE_PREFIX + paused_queue_name)
  true
rescue Redis::CommandError => e
  raise unless e.message == 'ERR no such key'
  false
end

#pause_until(timestamp) ⇒ Object



38
39
40
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 38

def pause_until(timestamp)
  UnPause.enqueue(timestamp, name) if pause
end

#paused?(unknown = false) ⇒ Boolean



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 58

def paused?(unknown = false)
  # parameter is what to return if the queue is empty, and so the state is unknown
  if Resque.inline
    false
  elsif Resque.redis.exists(RESQUE_PREFIX + @queue.to_s)
    false
  elsif Resque.redis.exists(RESQUE_PREFIX + paused_queue_name)
    true
  else
    unknown
  end
end

#paused_queue_nameObject



71
72
73
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 71

def paused_queue_name
  @queue.to_s + '_paused'
end

#rate_limited_enqueue(klass, *params) ⇒ Object



17
18
19
20
21
22
23
24
25
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 17

def rate_limited_enqueue(klass, *params)
  with_lock do
    if paused?
      Resque.enqueue_to(paused_queue_name, klass, *params)
    else
      Resque.enqueue_to(@queue, klass, *params)
    end
  end
end

#rate_limited_requeue(klass, *params) ⇒ Object



27
28
29
30
31
32
33
34
35
36
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 27

def rate_limited_requeue(klass, *params)
  # if the queue is empty, this was the last job - so queue to the paused queue
  with_lock do
    if paused?(true)
      Resque.enqueue_to(paused_queue_name, klass, *params)
    else
      Resque.enqueue_to(@queue, klass, *params)
    end
  end
end

#un_pauseObject



42
43
44
45
46
47
48
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 42

def un_pause
  Resque.redis.renamenx(RESQUE_PREFIX + paused_queue_name, RESQUE_PREFIX + @queue.to_s)
  true
rescue Redis::CommandError => e
  raise unless e.message == 'ERR no such key'
  false
end

#with_lockObject



75
76
77
78
79
80
81
# File 'lib/resque/plugins/rate_limited/rate_limited.rb', line 75

def with_lock
  if Resque.inline
    yield
  else
    RedisMutex.with_lock(MUTEX, block: 60, expire: 120) { yield }
  end
end