Module: Resque::Durable
- Defined in:
- lib/resque/durable.rb,
lib/resque/durable/guid.rb,
lib/resque/durable/monitor.rb,
lib/resque/durable/version.rb,
lib/resque/durable/queue_audit_methods.rb,
lib/resque/durable/background_heartbeat.rb
Defined Under Namespace
Modules: GUID, Monitor, QueueAuditMethods
Classes: BackgroundHeartbeat
Constant Summary
collapse
- VERSION =
"4.4.0"
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.extended(base) ⇒ Object
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/resque/durable.rb', line 9
def self.extended(base)
base.cattr_accessor :job_timeout
base.job_timeout = 10.minutes
base.cattr_accessor :background_heartbeat_interval
base.cattr_accessor :auditor
base.auditor = QueueAudit
end
|
Instance Method Details
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/resque/durable.rb', line 57
def around_perform_manage_audit(*args)
if a = audit(args)
return if a.complete?
if background_heartbeat_interval
raise "background_heartbeat_interval (#{background_heartbeat_interval.inspect}) be smaller than job_timeout (#{job_timeout.inspect})" if background_heartbeat_interval >= job_timeout
BackgroundHeartbeat.new(audit(args), background_heartbeat_interval).with_heartbeat do
yield
end
else
a.heartbeat!
yield
end
if requeue_immediately
a.reset_backoff!
else
a.complete!
end
else
yield
end
ensure
@requeue_immediately = false
end
|
#audit(args) ⇒ Object
45
46
47
48
49
|
# File 'lib/resque/durable.rb', line 45
def audit(args)
audit = auditor.find_by_enqueued_id(args.last)
audit_failed(ArgumentError.new("Could not find audit: #{args.last}")) if audit.nil?
audit
end
|
#audit_failed(e, args) ⇒ Object
98
99
100
|
# File 'lib/resque/durable.rb', line 98
def audit_failed(e, args)
raise e
end
|
#build_audit(args) ⇒ Object
94
95
96
|
# File 'lib/resque/durable.rb', line 94
def build_audit(args)
auditor.initialize_by_klass_and_args(self, args)
end
|
90
91
92
|
# File 'lib/resque/durable.rb', line 90
def disable_requeue_immediately
@requeue_immediately = false
end
|
#enqueue(*args) ⇒ Object
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/resque/durable.rb', line 25
def enqueue(*args)
if args.last.is_a?(auditor)
audit = args.pop
else
audit = build_audit(args)
end
args << audit.enqueued_id
begin
audit.enqueued!
rescue Exception => e
audit_failed(e, args)
end
Resque.enqueue(self, *args)
rescue Exception => e
enqueue_failed(e, args)
end
|
#enqueue_failed(e, args) ⇒ Object
102
103
104
|
# File 'lib/resque/durable.rb', line 102
def enqueue_failed(e, args)
raise e
end
|
#heartbeat(args) ⇒ Object
51
52
53
54
55
|
# File 'lib/resque/durable.rb', line 51
def heartbeat(args)
if a = audit(args)
a.heartbeat!
end
end
|
82
83
84
|
# File 'lib/resque/durable.rb', line 82
def requeue_immediately
@requeue_immediately
end
|
86
87
88
|
# File 'lib/resque/durable.rb', line 86
def requeue_immediately!
@requeue_immediately = true
end
|