Class: Steve::QueuedJob

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/steve/queued_job.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cleanup(age = 5.days.ago) ⇒ Object

Remove old completed jobs from the database



207
208
209
# File 'lib/steve/queued_job.rb', line 207

def self.cleanup(age = 5.days.ago)
  self.delete_all(["status = 'completed' and run_at < ?", age])
end

.execute_jobs(queue = '*', limit = 5) ⇒ Object

Execute a new job from the queue. Returns true if a job was executed, or false if a job was not found or we couldn’t obtain locks for them.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/steve/queued_job.rb', line 46

def self.execute_jobs(queue = '*', limit = 5)
  pending_jobs = ActiveRecord::Base.silence do
    jobs = self.where(:status => ['pending', 'delayed'], :worker => nil).where(["run_at <= ?", Time.now.utc]).order("priority asc").limit(5)
    jobs = jobs.where(:queue => queue) unless queue.nil? or queue == '*'
    jobs.all
  end

  jobs_executed = Array.new
  for job in pending_jobs.sort_by { rand() }
    Steve.log "[#{job.id}] Attempt to aquire lock"
    if job.lock 
      Steve.log "[#{job.id}] Lock acquired"
      ActiveRecord::Base.remove_connection
      if @child = fork
        rand
        Steve.log "[#{job.id}] Forked to #{@child}"
        $0 = "sj: forked to #{@child} at #{Time.now.utc.to_s(:db)}" unless Steve.keep_parent_process_name
        ActiveRecord::Base.establish_connection
        Process.wait
      else
        Steve.log "[#{job.id}] Executing"
        $0 = "sj: executing job ##{job.id} since #{Time.now.utc.to_s(:db)}"
        ActiveRecord::Base.establish_connection
        Steve.after_job_fork.call if Steve.after_job_fork.is_a?(Proc)
        job.execute
        exit
      end
      jobs_executed << job
    else
      Steve.log "[#{job.id}] Lock could not be acquired. Moving on."
    end
  end
  jobs_executed
end

.queue(klass, params = {}, &block) ⇒ Object

Queue a new job for processing. Returns true or false depending whether the job has been queued or not.



34
35
36
37
38
39
40
41
42
# File 'lib/steve/queued_job.rb', line 34

def self.queue(klass, params = {}, &block)
  job = self.new
  job.job = klass.to_s
  job.params = params
  job.queue = klass.instance_variable_get('@queue')
  job.priority = klass.instance_variable_get('@priority')
  block.call(job) if block_given?
  job.save
end

Instance Method Details

#archive_job?Boolean

In a state appropriate to be archived?

Returns:

  • (Boolean)


198
199
200
# File 'lib/steve/queued_job.rb', line 198

def archive_job?
  ['failed', 'completed'].include?(self.status)
end

#associate_with(object) ⇒ Object

Associate this job with the pased active record object



192
193
194
195
# File 'lib/steve/queued_job.rb', line 192

def associate_with(object)
  self.associated_object = object
  self.save(:validate => false)
end

#associated_objectObject

Can belong to another active record object?



13
# File 'lib/steve/queued_job.rb', line 13

belongs_to :associated_object, :polymorphic => true

#delay!(delay_time = 30.seconds) ⇒ Object

Delay this job by the time specified



182
183
184
185
186
187
188
189
# File 'lib/steve/queued_job.rb', line 182

def delay!(delay_time = 30.seconds)
  self.status = 'delayed'
  self.run_at = Time.now.utc + delay_time
  self.started_at = nil
  self.worker = nil
  self.retries += 1
  self.save(:validate => false)
end

#executeObject

Execute this job, catching any errors if they occur and ensuring the job is started & finished as appropriate.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/steve/queued_job.rb', line 83

def execute
  job = self.job.constantize.new(self)
  if job.respond_to?(:perform)
    start!
    STDOUT.reopen(output_file) && STDERR.reopen(output_file)
    begin
      job.perform
      success!
      Steve.log "[#{self.id}] Succeeded"
    rescue Steve::Job::Delay => e
      max_attempts = (Steve.max_job_retries || 5)
      if self.retries >= max_attempts
        fail!("#{e.message} after #{max_attempts} attempt(s)")
        Steve.log "[#{self.id}] Failed after #{max_attempts} attempt(s): #{e.message}"
      else
        self.error = e.message
        delay!
        Steve.log "[#{self.id}] Delayed ('#{e.message}')"            
      end
    rescue Timeout::Error
      fail!('Timed out')
      Steve.log "[#{self.id}] Timed out: #{e.to_s}"
    rescue => e
      if e.is_a?(Steve::Job::Error)
        fail!(e.message)
      else
        if defined?(Airbrake)
          Airbrake.notify(e, :component => self.job.to_s, :action => self.id.to_s, :parameters => self.params)
        end
        fail!([e.to_s, e.backtrace].join("\n"))
      end
      Steve.log "[#{self.id}] Failed: #{e.to_s}"
    end
  else
    fail! "#{self.id} did not respond to 'perform'"
    Steve.log "[#{self.id}] Failed: does not respond to 'perform'"
    return false
  end
ensure
  STDOUT.flush && STDERR.flush
  if File.exist?(output_file)
    self.output = File.read(output_file)
    FileUtils.rm(output_file)
  end
  self.save(:validate => false)
end

#fail!(message) ⇒ Object

Mark this job as failed.



160
161
162
163
164
# File 'lib/steve/queued_job.rb', line 160

def fail!(message)
  self.error = message
  self.status = 'failed'
  finish!
end

#finish!Object

Mark this job as finished



167
168
169
# File 'lib/steve/queued_job.rb', line 167

def finish!
  self.finished_at = Time.now.utc
end

#lockObject

Get a lock on this job. Returns true if the lock was successful otherwise it returns false.



143
144
145
146
147
148
149
150
151
# File 'lib/steve/queued_job.rb', line 143

def lock
  rows = self.class.update_all({:worker => Steve.worker_name}, {:id => self.id, :worker => nil})
  if rows == 1
    self.worker = Steve.worker_name
    return true
  else
    return false
  end
end

#outputObject

Get the output of a job, even if the job is in progress, live output won’t work very well in a multiple app-server environment



133
134
135
136
137
138
139
# File 'lib/steve/queued_job.rb', line 133

def output
  if read_attribute(:output)
    read_attribute(:output)
  elsif File.exist?(output_file)
    File.read(output_file) 
  end
end

#output_fileObject



202
203
204
# File 'lib/steve/queued_job.rb', line 202

def output_file
  @output_file ||= File.join('', 'tmp', "steve-job-#{self.id}")
end

#paramsObject

Serialize the options



10
# File 'lib/steve/queued_job.rb', line 10

serialize :params

#pendingObject

Scopes



16
# File 'lib/steve/queued_job.rb', line 16

scope :pending, lambda { where(:status => ['pending', 'delayed']) }

#start!Object

Mark this job as started



172
173
174
175
176
177
178
179
# File 'lib/steve/queued_job.rb', line 172

def start!
  self.error = nil
  self.finished_at = nil
  self.started_at = Time.now.utc
  self.status = 'running'
  self.job_pid = Process.pid
  self.save(:validate => false)
end

#success!Object

Mark this job as succeeded successfully.



154
155
156
157
# File 'lib/steve/queued_job.rb', line 154

def success!
  self.status = 'completed'
  finish!
end