Class: Resque::Plugins::JobHistory::Job

Inherits:
HistoryDetails show all
Defined in:
lib/resque/plugins/job_history/job.rb

Overview

a class encompassing a single job.

Constant Summary

Constants inherited from HistoryDetails

HistoryDetails::MAX_LINEAR_HISTORY, HistoryDetails::NAME_SPACE

Instance Attribute Summary collapse

Attributes inherited from HistoryDetails

#class_name

Instance Method Summary collapse

Methods inherited from HistoryDetails

class_list_page_size, class_list_page_size=, #class_name_valid?, #clean_old_running_jobs, #finished_jobs, #job_history_base_key, job_history_key, #last_run, #linear_jobs, linear_page_size, linear_page_size=, #max_concurrent_jobs, max_linear_jobs, max_linear_jobs=, #num_finished_jobs, #num_running_jobs, #page_size, #redis, #running_jobs, #total_failed_jobs, #total_finished_jobs, #total_run_jobs

Constructor Details

#initialize(class_name, job_id) ⇒ Job

Returns a new instance of Job.



12
13
14
15
16
17
# File 'lib/resque/plugins/job_history/job.rb', line 12

def initialize(class_name, job_id)
  super(class_name)

  @stored_values = nil
  @job_id        = job_id
end

Instance Attribute Details

#job_idObject

Returns the value of attribute job_id.



10
11
12
# File 'lib/resque/plugins/job_history/job.rb', line 10

def job_id
  @job_id
end

Instance Method Details

#abortObject



102
103
104
105
106
# File 'lib/resque/plugins/job_history/job.rb', line 102

def abort
  running_jobs.remove_job(job_id)

  reset
end

#argsObject



47
48
49
# File 'lib/resque/plugins/job_history/job.rb', line 47

def args
  decode_args(stored_values[:args])
end

#blank?Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/resque/plugins/job_history/job.rb', line 31

def blank?
  !redis.exists job_key
end

#cancel(caller_message = nil, start_time = nil, *args) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/resque/plugins/job_history/job.rb', line 108

def cancel(caller_message = nil, start_time = nil, *args)
  if start_time.present?
    record_job_start(start_time, *args)
  end

  if present?
    redis.hset(job_key,
               "error",
               "Unknown - Job failed to signal ending after the configured purge time or was canceled manually.#{caller_message}")
  end

  redis.incr(total_failed_key)

  finish
end

#durationObject



39
40
41
# File 'lib/resque/plugins/job_history/job.rb', line 39

def duration
  (end_time || Time.now) - (start_time || Time.now)
end

#end_timeObject



43
44
45
# File 'lib/resque/plugins/job_history/job.rb', line 43

def end_time
  stored_values[:end_time].try(:to_time)
end

#errorObject



59
60
61
# File 'lib/resque/plugins/job_history/job.rb', line 59

def error
  stored_values[:error]
end

#failed(exception, start_time = nil, *args) ⇒ Object



91
92
93
94
95
96
97
98
99
100
# File 'lib/resque/plugins/job_history/job.rb', line 91

def failed(exception, start_time = nil, *args)
  if start_time.present?
    record_job_start(start_time, *args)
  end

  redis.hset(job_key, "error", exception_message(exception)) if present?
  redis.incr(total_failed_key)

  finish
end

#finish(start_time = nil, *args) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/resque/plugins/job_history/job.rb', line 74

def finish(start_time = nil, *args)
  if start_time.present?
    record_job_start(start_time, *args)
  end

  if present?
    redis.hset(job_key, "end_time", Time.now.utc.to_s)
    finished_jobs.add_job(job_id, class_name)
  end

  running_jobs.remove_job(job_id)

  reset

  self
end

#finished?Boolean

Returns:

  • (Boolean)


27
28
29
# File 'lib/resque/plugins/job_history/job.rb', line 27

def finished?
  stored_values[:end_time].present?
end

#job_keyObject



19
20
21
# File 'lib/resque/plugins/job_history/job.rb', line 19

def job_key
  "#{job_history_base_key}.#{job_id}"
end

#purgeObject



138
139
140
141
142
143
144
145
146
147
# File 'lib/resque/plugins/job_history/job.rb', line 138

def purge
  # To keep the counts honest...
  abort unless finished?

  remove_from_job_lists

  redis.del(job_key)

  reset
end

#retryObject



124
125
126
127
128
# File 'lib/resque/plugins/job_history/job.rb', line 124

def retry
  return unless described_class

  Resque.enqueue described_class, *args
end

#safe_purgeObject



130
131
132
133
134
135
136
# File 'lib/resque/plugins/job_history/job.rb', line 130

def safe_purge
  return if running_jobs.includes_job?(job_id)
  return if finished_jobs.includes_job?(job_id)
  return if linear_jobs.includes_job?(job_id)

  purge
end

#start(*args) ⇒ Object



63
64
65
66
67
68
69
70
71
72
# File 'lib/resque/plugins/job_history/job.rb', line 63

def start(*args)
  record_job_start(Time.now.utc.to_s, *args)

  num_jobs = running_jobs.add_job(job_id, class_name)
  linear_jobs.add_job(job_id, class_name) unless class_exclude_from_linear_history

  record_num_jobs(num_jobs)

  self
end

#start_timeObject



23
24
25
# File 'lib/resque/plugins/job_history/job.rb', line 23

def start_time
  stored_values[:start_time].try(:to_time)
end

#succeeded?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/resque/plugins/job_history/job.rb', line 35

def succeeded?
  error.blank?
end

#uncompressed_argsObject



51
52
53
54
55
56
57
# File 'lib/resque/plugins/job_history/job.rb', line 51

def uncompressed_args
  return args if described_class.blank? || args.blank?
  return args unless described_class.singleton_class.included_modules.map(&:name).include?("Resque::Plugins::Compressible")
  return args unless described_class.compressed?(args)

  described_class.uncompressed_args(args.first[:payload] || args.first["payload"])
end