Module: Resque::Durable
- Defined in:
- lib/resque/durable.rb,
lib/resque/durable/guid.rb,
lib/resque/durable/monitor.rb,
lib/resque/durable/queue_audit.rb,
lib/resque/durable/background_heartbeat.rb
Defined Under Namespace
Modules: GUID, Monitor
Classes: BackgroundHeartbeat, QueueAudit
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.extended(base) ⇒ Object
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# File 'lib/resque/durable.rb', line 8
def self.extended(base)
base.cattr_accessor :job_timeout
base.job_timeout = 10.minutes
base.cattr_accessor :background_heartbeat_interval
base.cattr_accessor :auditor
base.auditor = QueueAudit
end
|
Instance Method Details
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/resque/durable.rb', line 56
def around_perform_manage_audit(*args)
if a = audit(args)
return if a.complete?
if background_heartbeat_interval
raise "background_heartbeat_interval (#{background_heartbeat_interval.inspect}) be smaller than job_timeout (#{job_timeout.inspect})" if background_heartbeat_interval >= job_timeout
BackgroundHeartbeat.new(audit(args), background_heartbeat_interval).with_heartbeat do
yield
end
else
a.heartbeat!
yield
end
if requeue_immediately
a.reset_backoff!
else
a.complete!
end
else
yield
end
ensure
@requeue_immediately = false
end
|
#audit(args) ⇒ Object
44
45
46
47
48
|
# File 'lib/resque/durable.rb', line 44
def audit(args)
audit = auditor.find_by_enqueued_id(args.last)
audit_failed(ArgumentError.new("Could not find audit: #{args.last}")) if audit.nil?
audit
end
|
#audit_failed(e, args) ⇒ Object
93
94
95
|
# File 'lib/resque/durable.rb', line 93
def audit_failed(e, args)
raise e
end
|
#build_audit(args) ⇒ Object
89
90
91
|
# File 'lib/resque/durable.rb', line 89
def build_audit(args)
auditor.initialize_by_klass_and_args(self, args)
end
|
#enqueue(*args) ⇒ Object
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/resque/durable.rb', line 24
def enqueue(*args)
if args.last.is_a?(auditor)
audit = args.pop
else
audit = build_audit(args)
end
args << audit.enqueued_id
begin
audit.enqueued!
rescue Exception => e
audit_failed(e, args)
end
Resque.enqueue(self, *args)
rescue Exception => e
enqueue_failed(e, args)
end
|
#enqueue_failed(e, args) ⇒ Object
97
98
99
|
# File 'lib/resque/durable.rb', line 97
def enqueue_failed(e, args)
raise e
end
|
#heartbeat(args) ⇒ Object
50
51
52
53
54
|
# File 'lib/resque/durable.rb', line 50
def heartbeat(args)
if a = audit(args)
a.heartbeat!
end
end
|
81
82
83
|
# File 'lib/resque/durable.rb', line 81
def requeue_immediately
@requeue_immediately
end
|
85
86
87
|
# File 'lib/resque/durable.rb', line 85
def requeue_immediately!
@requeue_immediately = true
end
|