Class: Delayed::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/job.rb

Constant Summary collapse

MAX_RUN_TIME =
4.hours
ParseObjectFromYaml =
/\!ruby\/\w+\:([^\s]+)/

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(message) ⇒ Job



11
12
13
14
# File 'lib/delayed/job.rb', line 11

def initialize(message)
  @message = message
  @logger = defined?(RAILS_DEFAULT_LOGGER) ? RAILS_DEFAULT_LOGGER : Logger.new(STDOUT)
end

Class Method Details

.enqueue(*args, &block) ⇒ Object

Add a job to the queue

Raises:

  • (ArgumentError)


67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/delayed/job.rb', line 67

def self.enqueue(*args, &block)
  sqs_queue = args.shift
  raise ArgumentError, 'SQS Queue was not provided' unless sqs_queue.is_a? RightAws::SqsGen2::Queue

  object = block_given? ? EvaledJob.new(&block) : args.shift

  unless object.respond_to?(:perform) || block_given?
    raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
  end

  sqs_queue.send_message object.to_yaml
end

.expanded_list(sqs_queue) ⇒ Object



120
121
122
# File 'lib/delayed/job.rb', line 120

def self.expanded_list(sqs_queue)
  job_list(sqs_queue).collect {|j| j.to_s}
end

.job_list(sqs_queue) ⇒ Object



110
111
112
113
114
# File 'lib/delayed/job.rb', line 110

def self.job_list(sqs_queue)
  jobs = []
  while(message = sqs_queue.receive); jobs << Job.new(message); end
  jobs
end

.list(sqs_queue) ⇒ Object



116
117
118
# File 'lib/delayed/job.rb', line 116

def self.list(sqs_queue)
  job_list(sqs_queue).collect {|j| j.name}
end

.work_off(sqs_queue) ⇒ Object

Do num jobs and return stats on success/failure. Exit early if interrupted.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/delayed/job.rb', line 82

def self.work_off(sqs_queue)
  exit_flag = false
  until($exit)
    success, failure = 0, 0

    while(message = sqs_queue.receive)
      if message.to_s == 'stop'
        exit_flag = true
        break
      end
      
      case Job.new(message).run
      when true
          success += 1
      when false
          failure += 1
      else
        break  # leave if no work could be done
      end
      break if $exit # leave if we're exiting
    end
    
    break if exit_flag
    
    sleep(60)
  end
end

Instance Method Details

#invoke_jobObject

Moved into its own method so that new_relic can trace it.



62
63
64
# File 'lib/delayed/job.rb', line 62

def invoke_job
  payload_object.perform
end

#log_error(error) ⇒ Object

This is a good hook if you need to report job processing errors in additional or different ways



56
57
58
59
# File 'lib/delayed/job.rb', line 56

def log_error(error)
  @logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message}"
  @logger.error error.backtrace.inspect
end

#nameObject



20
21
22
23
24
25
26
27
28
29
# File 'lib/delayed/job.rb', line 20

def name
  @name ||= begin
    payload = payload_object
    if payload.respond_to?(:display_name)
      payload.display_name
    else
      payload.class.name
    end
  end
end

#payload_objectObject



16
17
18
# File 'lib/delayed/job.rb', line 16

def payload_object
  @payload_object ||= deserialize(@message.to_s)
end

#run(max_run_time = MAX_RUN_TIME) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/delayed/job.rb', line 38

def run(max_run_time = MAX_RUN_TIME)
  begin
    runtime = Benchmark.realtime do
      invoke_job # TODO: raise error if takes longer than max_run_time
    end
    # TODO : warn if runtime > max_run_time
    @logger.info "* [JOB] #{name} completed after %.4f" % runtime
    # The message is finally delete from the queue
    @message.delete
    true
  rescue Exception => e
    log_error(e)
    false # work failed
  end

end

#to_sObject



31
32
33
34
35
36
# File 'lib/delayed/job.rb', line 31

def to_s
 @to_s ||= begin
    payload = payload_object
    payload.respond_to?(:examine) ? payload.examine : name
  end
end