Class: Delayed::Job
- Inherits:
-
Object
- Object
- Delayed::Job
- Defined in:
- lib/delayed/job.rb
Constant Summary collapse
- MAX_RUN_TIME =
4.hours
- ParseObjectFromYaml =
/\!ruby\/\w+\:([^\s]+)/
Class Method Summary collapse
-
.enqueue(*args, &block) ⇒ Object
Add a job to the queue.
- .expanded_list(sqs_queue) ⇒ Object
- .job_list(sqs_queue) ⇒ Object
- .list(sqs_queue) ⇒ Object
-
.work_off(sqs_queue) ⇒ Object
Do num jobs and return stats on success/failure.
Instance Method Summary collapse
-
#initialize(message) ⇒ Job
constructor
A new instance of Job.
-
#invoke_job ⇒ Object
Moved into its own method so that new_relic can trace it.
-
#log_error(error) ⇒ Object
This is a good hook if you need to report job processing errors in additional or different ways.
- #name ⇒ Object
- #payload_object ⇒ Object
- #run(max_run_time = MAX_RUN_TIME) ⇒ Object
- #to_s ⇒ Object
Constructor Details
#initialize(message) ⇒ Job
11 12 13 14 |
# File 'lib/delayed/job.rb', line 11 def initialize() @message = @logger = defined?(RAILS_DEFAULT_LOGGER) ? RAILS_DEFAULT_LOGGER : Logger.new(STDOUT) end |
Class Method Details
.enqueue(*args, &block) ⇒ Object
Add a job to the queue
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/delayed/job.rb', line 67 def self.enqueue(*args, &block) sqs_queue = args.shift raise ArgumentError, 'SQS Queue was not provided' unless sqs_queue.is_a? RightAws::SqsGen2::Queue object = block_given? ? EvaledJob.new(&block) : args.shift unless object.respond_to?(:perform) || block_given? raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end sqs_queue. object.to_yaml end |
.expanded_list(sqs_queue) ⇒ Object
120 121 122 |
# File 'lib/delayed/job.rb', line 120 def self.(sqs_queue) job_list(sqs_queue).collect {|j| j.to_s} end |
.job_list(sqs_queue) ⇒ Object
110 111 112 113 114 |
# File 'lib/delayed/job.rb', line 110 def self.job_list(sqs_queue) jobs = [] while( = sqs_queue.receive); jobs << Job.new(); end jobs end |
.list(sqs_queue) ⇒ Object
116 117 118 |
# File 'lib/delayed/job.rb', line 116 def self.list(sqs_queue) job_list(sqs_queue).collect {|j| j.name} end |
.work_off(sqs_queue) ⇒ Object
Do num jobs and return stats on success/failure. Exit early if interrupted.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/delayed/job.rb', line 82 def self.work_off(sqs_queue) exit_flag = false until($exit) success, failure = 0, 0 while( = sqs_queue.receive) if .to_s == 'stop' exit_flag = true break end case Job.new().run when true success += 1 when false failure += 1 else break # leave if no work could be done end break if $exit # leave if we're exiting end break if exit_flag sleep(60) end end |
Instance Method Details
#invoke_job ⇒ Object
Moved into its own method so that new_relic can trace it.
62 63 64 |
# File 'lib/delayed/job.rb', line 62 def invoke_job payload_object.perform end |
#log_error(error) ⇒ Object
This is a good hook if you need to report job processing errors in additional or different ways
56 57 58 59 |
# File 'lib/delayed/job.rb', line 56 def log_error(error) @logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.}" @logger.error error.backtrace.inspect end |
#name ⇒ Object
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/delayed/job.rb', line 20 def name @name ||= begin payload = payload_object if payload.respond_to?(:display_name) payload.display_name else payload.class.name end end end |
#payload_object ⇒ Object
16 17 18 |
# File 'lib/delayed/job.rb', line 16 def payload_object @payload_object ||= deserialize(@message.to_s) end |
#run(max_run_time = MAX_RUN_TIME) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/delayed/job.rb', line 38 def run(max_run_time = MAX_RUN_TIME) begin runtime = Benchmark.realtime do invoke_job # TODO: raise error if takes longer than max_run_time end # TODO : warn if runtime > max_run_time @logger.info "* [JOB] #{name} completed after %.4f" % runtime # The message is finally delete from the queue @message.delete true rescue Exception => e log_error(e) false # work failed end end |
#to_s ⇒ Object
31 32 33 34 35 36 |
# File 'lib/delayed/job.rb', line 31 def to_s @to_s ||= begin payload = payload_object payload.respond_to?(:examine) ? payload.examine : name end end |