Module: Resque::Durable

Defined in:
lib/resque/durable.rb,
lib/resque/durable/guid.rb,
lib/resque/durable/monitor.rb,
lib/resque/durable/queue_audit.rb,
lib/resque/durable/background_heartbeat.rb

Defined Under Namespace

Modules: GUID, Monitor Classes: BackgroundHeartbeat, QueueAudit

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.extended(base) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/resque/durable.rb', line 8

def self.extended(base)
  # The duration since the last heartbeat that the monitor will wait before
  # re-enqueing the job.
  base.cattr_accessor :job_timeout
  base.job_timeout = 10.minutes

  # How frequently a background thread will optimistically heartbeat the
  # QueueAudit. Value must be smaller than job_timeout. Currently opt-in.
  #
  # Recommended value: `15.seconds`
  base.cattr_accessor :background_heartbeat_interval

  base.cattr_accessor :auditor
  base.auditor = QueueAudit
end

Instance Method Details

#around_perform_manage_audit(*args) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/resque/durable.rb', line 56

def around_perform_manage_audit(*args)
  if a = audit(args)
    return if a.complete?
    if background_heartbeat_interval
      raise "background_heartbeat_interval (#{background_heartbeat_interval.inspect}) be smaller than job_timeout (#{job_timeout.inspect})" if background_heartbeat_interval >= job_timeout
      BackgroundHeartbeat.new(audit(args), background_heartbeat_interval).with_heartbeat do
        yield
      end
    else
      a.heartbeat!
      yield
    end

    if requeue_immediately
      a.reset_backoff!
    else
      a.complete!
    end
  else
    yield
  end
ensure
  @requeue_immediately = false
end

#audit(args) ⇒ Object



44
45
46
47
48
# File 'lib/resque/durable.rb', line 44

def audit(args)
  audit = auditor.find_by_enqueued_id(args.last)
  audit_failed(ArgumentError.new("Could not find audit: #{args.last}")) if audit.nil?
  audit
end

#audit_failed(e, args) ⇒ Object



93
94
95
# File 'lib/resque/durable.rb', line 93

def audit_failed(e, args)
  raise e
end

#build_audit(args) ⇒ Object



89
90
91
# File 'lib/resque/durable.rb', line 89

def build_audit(args)
  auditor.initialize_by_klass_and_args(self, args)
end

#enqueue(*args) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/resque/durable.rb', line 24

def enqueue(*args)
  if args.last.is_a?(auditor)
    # the audit-is-re-enqueing case
    audit = args.pop
  else
    audit = build_audit(args)
  end

  args << audit.enqueued_id
  begin
    audit.enqueued!
  rescue Exception => e
    audit_failed(e, args)
  end

  Resque.enqueue(self, *args)
rescue Exception => e
  enqueue_failed(e, args)
end

#enqueue_failed(e, args) ⇒ Object



97
98
99
# File 'lib/resque/durable.rb', line 97

def enqueue_failed(e, args)
  raise e
end

#heartbeat(args) ⇒ Object



50
51
52
53
54
# File 'lib/resque/durable.rb', line 50

def heartbeat(args)
  if a = audit(args)
    a.heartbeat!
  end
end

#requeue_immediatelyObject



81
82
83
# File 'lib/resque/durable.rb', line 81

def requeue_immediately
  @requeue_immediately
end

#requeue_immediately!Object



85
86
87
# File 'lib/resque/durable.rb', line 85

def requeue_immediately!
  @requeue_immediately = true
end