Module: Resque::StuckQueue

Defined in:
lib/resque_stuck_queue.rb,
lib/resque_stuck_queue/config.rb,
lib/resque_stuck_queue/version.rb,
lib/resque_stuck_queue/heartbeat_job.rb

Defined Under Namespace

Classes: Config, HeartbeatJob

Constant Summary collapse

HEARTBEAT_INTERVAL =

defaults

5 * 60
WATCHER_INTERVAL =

send heartbeat job every 5 minutes

5
TRIGGER_TIMEOUT =

check key is udpated every 5 seconds.

60 * 60
TRIGGERED_HANDLER =

must be called by convention: type_handler

proc { |queue_name, lag| Resque::StuckQueue::LOGGER.info("Shit gone bad with them queues...on #{queue_name}. Lag time is #{lag}") }
RECOVERED_HANDLER =
proc { |queue_name, lag| Resque::StuckQueue::LOGGER.info("recovered queue phew #{queue_name}. Lag time is #{lag}") }
LOGGER =
Logger.new($stdout)
HEARTBEAT_KEY =
"resque-stuck-queue"
TRIGGERED_KEY =
"resque-stuck-queue-last-triggered"
VERSION =
"0.5.2"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.configObject

Returns the value of attribute config.



18
19
20
# File 'lib/resque_stuck_queue.rb', line 18

def config
  @config
end

Class Method Details

.abort_on_exceptionObject



56
57
58
59
60
61
62
# File 'lib/resque_stuck_queue.rb', line 56

def abort_on_exception
  if !config[:abort_on_exception].nil?
    config[:abort_on_exception] # allow overriding w false
  else
    true # default
  end
end

.force_stop!Object



108
109
110
111
112
# File 'lib/resque_stuck_queue.rb', line 108

def force_stop!
  logger.info("Force stopping")
  @threads.map(&:kill)
  reset!
end

.heartbeat_key_for(queue) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/resque_stuck_queue.rb', line 32

def heartbeat_key_for(queue)
  if config[:heartbeat_key]
    "#{queue}:#{config[:heartbeat_key]}"
  else
    "#{queue}:#{HEARTBEAT_KEY}"
  end
end

.heartbeat_keysObject



48
49
50
# File 'lib/resque_stuck_queue.rb', line 48

def heartbeat_keys
  queues.map{|q| heartbeat_key_for(q) }
end

.log_starting_infoObject



145
146
147
# File 'lib/resque_stuck_queue.rb', line 145

def log_starting_info
  logger.info("Starting StuckQueue with config: #{self.config.inspect}")
end

.log_watcher_info(queue_name) ⇒ Object



149
150
151
152
153
154
155
156
# File 'lib/resque_stuck_queue.rb', line 149

def log_watcher_info(queue_name)
  logger.info("Lag time for #{queue_name} is #{lag_time(queue_name).inspect} seconds.")
  if triggered_ago = last_triggered(queue_name)
    logger.info("Last triggered for #{queue_name} is #{triggered_ago.inspect} seconds.")
  else
    logger.info("No last trigger found for #{queue_name}.")
  end
end

.loggerObject



24
25
26
# File 'lib/resque_stuck_queue.rb', line 24

def logger
  @logger ||= (config[:logger] || StuckQueue::LOGGER)
end

.queuesObject



52
53
54
# File 'lib/resque_stuck_queue.rb', line 52

def queues
  @queues ||= (config[:queues] || [:app])
end

.redisObject



28
29
30
# File 'lib/resque_stuck_queue.rb', line 28

def redis
  @redis ||= config[:redis]
end

.reset!Object



114
115
116
117
118
119
120
# File 'lib/resque_stuck_queue.rb', line 114

def reset!
  # clean state so we can stop and start in the same process.
  @config = Config.new # clear, unfreeze
  @queues = nil
  @running = false
  @logger = nil
end

.reset_keysObject



122
123
124
125
126
127
# File 'lib/resque_stuck_queue.rb', line 122

def reset_keys
  queues.each do |qn|
    redis.del(heartbeat_key_for(qn))
    redis.del(triggered_key_for(qn))
  end
end

.startObject

call this after setting config. once started you should’t be allowed to modify it



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/resque_stuck_queue.rb', line 72

def start
  @running = true
  @stopped = false
  @threads = []
  config.validate_required_keys!
  config.freeze

  log_starting_info

  reset_keys

  RedisClassy.redis = redis if RedisClassy.redis.nil?

  pretty_process_name

  setup_heartbeat_thread
  setup_watcher_thread

  setup_warn_thread

  # fo-eva.
  @threads.map(&:join)

  logger.info("threads stopped")
  @stopped = true
end

.start_in_backgroundObject



64
65
66
67
68
69
# File 'lib/resque_stuck_queue.rb', line 64

def start_in_background
  Thread.new do
    Thread.current.abort_on_exception = abort_on_exception
    self.start
  end
end

.stopObject



99
100
101
102
103
104
105
106
# File 'lib/resque_stuck_queue.rb', line 99

def stop
  reset!
  # wait for clean thread shutdown
  while @stopped == false
    sleep 1
  end
  logger.info("Stopped")
end

.stopped?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/resque_stuck_queue.rb', line 129

def stopped?
  @stopped
end

.trigger_handler(queue_name, type) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/resque_stuck_queue.rb', line 133

def trigger_handler(queue_name, type)
  raise 'Must trigger either the recovered or triggered handler!' unless (type == :recovered || type == :triggered)
  handler_name = :"#{type}_handler"
  logger.info("Triggering #{type} handler for #{queue_name} at #{Time.now}.")
  (config[handler_name] || const_get(handler_name.upcase)).call(queue_name, lag_time(queue_name))
  manual_refresh(queue_name, type)
rescue => e
  logger.info("handler #{type} for #{queue_name} crashed: #{e.inspect}")
  logger.info("\n#{e.backtrace.join("\n")}")
  raise e
end

.triggered_key_for(queue) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/resque_stuck_queue.rb', line 40

def triggered_key_for(queue)
  if config[:triggered_key]
    "#{queue}:#{self.config[:triggered_key]}"
  else
    "#{queue}:#{TRIGGERED_KEY}"
  end
end