Class: Fanforce::Worker
- Inherits:
-
Object
- Object
- Fanforce::Worker
- Defined in:
- lib/fanforce/worker/worker.rb,
lib/fanforce/worker/version.rb
Defined Under Namespace
Classes: Timeout
Constant Summary collapse
- LOADED_AT =
Time.now
- MAX_EXECUTION_TIME =
3300
- VERSION =
'0.15.3'
Class Method Summary collapse
- .add_error(queue_id, error) ⇒ Object
- .current_job ⇒ Object
- .current_job=(job) ⇒ Object
- .current_params ⇒ Object
- .current_params=(params) ⇒ Object
- .current_queue_id ⇒ Object
- .current_queue_id=(queue_id) ⇒ Object
- .current_retries ⇒ Object
- .current_retries=(retries) ⇒ Object
- .current_worker_env ⇒ Object
- .current_worker_env=(env_vars) ⇒ Object
- .delete_job(job = nil) ⇒ Object
- .enqueue(queue_id, params, options = {}) ⇒ Object
- .handle_job_error(e, job, job_data) ⇒ Object
- .handle_job_loading_error(e, job, job_data) ⇒ Object
- .job_has_enough_time_to_run(min_execution_time) ⇒ Object
- .load_env ⇒ Object
- .retry(options) ⇒ Object
- .run(worker_data, min_execution_time = 300, &code_block) ⇒ Object
- .run_job(job, job_data, &code_block) ⇒ Object
- .set_env_vars(vars) ⇒ Object
- .worker_time_remaining ⇒ Object
Instance Method Summary collapse
- #add_error(queue_id, error) ⇒ Object
- #delete_error(queue_id, job_id, details_id) ⇒ Object
- #enqueue(queue_id, params, options = {}) ⇒ Object
- #error_details(queue_id, details_id) ⇒ Object
-
#initialize(opts = {}) ⇒ Worker
constructor
A new instance of Worker.
- #iron_cache ⇒ Object
- #iron_mq ⇒ Object
- #retry_error(queue_id, job_id, details_id) ⇒ Object
- #truncate(text, length = 130, truncate_string = "...") ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Worker
Returns a new instance of Worker.
10 11 12 |
# File 'lib/fanforce/worker/worker.rb', line 10 def initialize(opts={}) @opts = opts end |
Class Method Details
.add_error(queue_id, error) ⇒ Object
95 96 97 |
# File 'lib/fanforce/worker/worker.rb', line 95 def self.add_error(queue_id, error) self.new.add_error(queue_id, error) end |
.current_job ⇒ Object
137 138 139 |
# File 'lib/fanforce/worker/worker.rb', line 137 def self.current_job @current_job end |
.current_job=(job) ⇒ Object
133 134 135 |
# File 'lib/fanforce/worker/worker.rb', line 133 def self.current_job=(job) @current_job = job end |
.current_params ⇒ Object
121 122 123 |
# File 'lib/fanforce/worker/worker.rb', line 121 def self.current_params @current_params end |
.current_params=(params) ⇒ Object
117 118 119 |
# File 'lib/fanforce/worker/worker.rb', line 117 def self.current_params=(params) @current_params = params end |
.current_queue_id ⇒ Object
105 106 107 |
# File 'lib/fanforce/worker/worker.rb', line 105 def self.current_queue_id @current_queue_id end |
.current_queue_id=(queue_id) ⇒ Object
101 102 103 |
# File 'lib/fanforce/worker/worker.rb', line 101 def self.current_queue_id=(queue_id) @current_queue_id = queue_id end |
.current_retries ⇒ Object
129 130 131 |
# File 'lib/fanforce/worker/worker.rb', line 129 def self.current_retries @current_retries end |
.current_retries=(retries) ⇒ Object
125 126 127 |
# File 'lib/fanforce/worker/worker.rb', line 125 def self.current_retries=(retries) @current_retries = retries end |
.current_worker_env ⇒ Object
113 114 115 |
# File 'lib/fanforce/worker/worker.rb', line 113 def self.current_worker_env @current_worker_env end |
.current_worker_env=(env_vars) ⇒ Object
109 110 111 |
# File 'lib/fanforce/worker/worker.rb', line 109 def self.current_worker_env=(env_vars) @current_worker_env = env_vars end |
.delete_job(job = nil) ⇒ Object
242 243 244 245 246 |
# File 'lib/fanforce/worker/worker.rb', line 242 def self.delete_job(job=nil) return if job.nil? and current_job.nil? (job || current_job).delete self.current_job = nil end |
.enqueue(queue_id, params, options = {}) ⇒ Object
91 92 93 |
# File 'lib/fanforce/worker/worker.rb', line 91 def self.enqueue(queue_id, params, ={}) self.new.enqueue(queue_id, params, ) end |
.handle_job_error(e, job, job_data) ⇒ Object
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/fanforce/worker/worker.rb', line 223 def self.handle_job_error(e, job, job_data) raise($!, "#{$!}: THERE IS NO JOB", $!.backtrace) if job.nil? error = job_data.merge( http_code: (e.code if e.respond_to?(:code)), exception: e.class.name, message: e., backtrace: e.backtrace, errored_at: Time.now, env_vars: current_worker_env ) error[:curl_command] = e.curl_command if e.respond_to?(:curl_command) self.delete_job(job) puts 'REMOVED JOB FROM QUEUE, AND SAVING TO ERROR CACHE...' puts error.to_json self.add_error current_queue_id, error end |
.handle_job_loading_error(e, job, job_data) ⇒ Object
215 216 217 218 219 220 221 |
# File 'lib/fanforce/worker/worker.rb', line 215 def self.handle_job_loading_error(e, job, job_data) raise($!, "#{$!}: THERE IS NO JOB", $!.backtrace) if job.nil? self.delete_job(job) puts 'REMOVED JOB FROM QUEUE, BUT COULD NOT SAVE TO ERROR CACHE...' raise($!, "#{$!}: #{job_data.to_json}", $!.backtrace) end |
.job_has_enough_time_to_run(min_execution_time) ⇒ Object
190 191 192 193 194 195 |
# File 'lib/fanforce/worker/worker.rb', line 190 def self.job_has_enough_time_to_run(min_execution_time) time_since_load = Time.now - LOADED_AT return false if time_since_load > MAX_EXECUTION_TIME return false if worker_time_remaining < min_execution_time return true end |
.load_env ⇒ Object
141 142 143 144 145 146 147 148 149 |
# File 'lib/fanforce/worker/worker.rb', line 141 def self.load_env if File.exists?('.developmentenv.rb') require '.developmentenv' elsif File.exists?('.stagingenv.rb') require '.stagingenv' elsif File.exists?('.productionenv.rb') require '.productionenv' end end |
.retry(options) ⇒ Object
197 198 199 |
# File 'lib/fanforce/worker/worker.rb', line 197 def self.retry() self.new.enqueue(current_queue_id, current_params, .merge(retries: current_retries + 1)) end |
.run(worker_data, min_execution_time = 300, &code_block) ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/fanforce/worker/worker.rb', line 151 def self.run(worker_data, min_execution_time=300, &code_block) raise "min_execution_time was set to #{min_execution_time}, which is #{min_execution_time - MAX_EXECUTION_TIME} seconds too long" if min_execution_time > MAX_EXECUTION_TIME puts 'LOADING WORKER ENV...' load_env require 'iron_mq' require 'iron_cache' require 'fanforce/api' require 'active_support/all' self.current_queue_id = worker_data['queue_id'] self.current_worker_env = worker_data['env_vars'] queue = IronMQ::Client.new.queue(current_queue_id) puts 'PROCESSING JOBS...' puts '------------------------------------------------------------------------------------' job_num = 0 job_data = nil while job_has_enough_time_to_run(min_execution_time) and (job = queue.get(timeout: 3600)) do puts "- JOB #{job_num+=1}: #{job.body}" Timeout::timeout(worker_time_remaining, Fanforce::Worker::Timeout) do job_data = nil job_data = Fanforce.decode_json(job.body) run_job(job, job_data, &code_block) end self.delete_job puts '------------------------------------------------------------------------------------' end self.delete_job puts 'WINDING DOWN WORKER!' rescue Exception => e handle_job_loading_error(e, job, job_data) end |
.run_job(job, job_data, &code_block) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/fanforce/worker/worker.rb', line 201 def self.run_job(job, job_data, &code_block) trap('SIGHUP') do raise 'trapped SIGHUP' end self.current_job = job self.current_params = job_data[:params] self.current_retries = job_data[:retries] set_env_vars(current_worker_env) code_block.call(job_data[:params].clone, retries: job_data[:retries], queue_id: current_queue_id) self.delete_job(job) rescue Exception => e handle_job_error(e, job, job_data) end |
.set_env_vars(vars) ⇒ Object
248 249 250 |
# File 'lib/fanforce/worker/worker.rb', line 248 def self.set_env_vars(vars) vars.each {|k,v| ENV[k.to_s]=v } end |
.worker_time_remaining ⇒ Object
185 186 187 188 |
# File 'lib/fanforce/worker/worker.rb', line 185 def self.worker_time_remaining time_since_load = Time.now - LOADED_AT MAX_EXECUTION_TIME - time_since_load end |
Instance Method Details
#add_error(queue_id, error) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/fanforce/worker/worker.rb', line 30 def add_error(queue_id, error) require 'uuidtools' details_id = UUIDTools::UUID.random_create.to_s iron_cache.cache("#{queue_id}-ERRORS").put(details_id, error.to_json) iron_mq.queue("#{queue_id}-ERRORS").post({ details_id: details_id, http_code: error[:http_code], exception: truncate(error[:exception]), message: truncate(error[:message].to_s), params: truncate(error[:params].to_json), errored_at: error[:errored_at], retries: error[:retries], env_vars: truncate(error[:env_vars].to_json), curl_command: truncate(error[:curl_command].to_s) }.to_json) rescue => e puts '-----------------------------------------------------' puts 'WORKER ERROR WHILE RECOVERING FROM JOB ERROR:' puts e. puts e.backtrace puts '-----------------------------------------------------' puts 'JOB ERROR:' puts "details_id: #{details_id}" puts "http_code: #{error[:http_code]}" puts "exception: #{truncate(error[:exception])}" puts "message: #{truncate(error[:message].to_s)}" puts "params: #{truncate(error[:params].to_json)}" puts "errored_at: #{error[:errored_at]}" puts "retries: #{error[:retries]}" puts "env_vars: #{truncate(error[:env_vars].to_json)}" puts "curl_command: #{truncate(error[:curl_command].to_s)}" end |
#delete_error(queue_id, job_id, details_id) ⇒ Object
63 64 65 66 |
# File 'lib/fanforce/worker/worker.rb', line 63 def delete_error(queue_id, job_id, details_id) iron_mq.queue("#{queue_id}-ERRORS").delete(job_id) iron_cache.cache("#{queue_id}-ERRORS").delete(details_id) end |
#enqueue(queue_id, params, options = {}) ⇒ Object
24 25 26 27 28 |
# File 'lib/fanforce/worker/worker.rb', line 24 def enqueue(queue_id, params, ={}) retries = ([:retries].present?) ? .delete(:retries) : 0 raise 'Params being sent to the queue must be a Hash' if !params.is_a?(Hash) iron_mq.queue(queue_id).post({params: params, retries: retries}.to_json, ) end |
#error_details(queue_id, details_id) ⇒ Object
68 69 70 71 |
# File 'lib/fanforce/worker/worker.rb', line 68 def error_details(queue_id, details_id) cache = iron_cache.cache("#{queue_id}-ERRORS").get(details_id) MultiJson.load(cache.value, :symbolize_keys => true) end |
#iron_cache ⇒ Object
19 20 21 22 |
# File 'lib/fanforce/worker/worker.rb', line 19 def iron_cache require 'iron_cache' @iron_cache ||= IronCache::Client.new(:token => @opts[:token] || ENV['IRON_TOKEN'], :project_id => @opts[:project_id] || ENV['IRON_PROJECT_ID']) end |
#iron_mq ⇒ Object
14 15 16 17 |
# File 'lib/fanforce/worker/worker.rb', line 14 def iron_mq require 'iron_mq' @iron_mq ||= IronMQ::Client.new(:token => @opts[:token] || ENV['IRON_TOKEN'], :project_id => @opts[:project_id] || ENV['IRON_PROJECT_ID']) end |
#retry_error(queue_id, job_id, details_id) ⇒ Object
73 74 75 76 77 78 79 |
# File 'lib/fanforce/worker/worker.rb', line 73 def retry_error(queue_id, job_id, details_id) cache = iron_cache.cache("#{queue_id}-ERRORS").get(details_id) cache_data = MultiJson.load(cache.value, :symbolize_keys => true) enqueue(queue_id, cache_data[:params], :retries => cache_data[:retries] + 1) cache.delete and iron_mq.queue("#{queue_id}-ERRORS").delete(job_id) end |
#truncate(text, length = 130, truncate_string = "...") ⇒ Object
81 82 83 84 85 86 87 |
# File 'lib/fanforce/worker/worker.rb', line 81 def truncate(text, length=130, truncate_string="...") if text l = length - truncate_string.chars.to_a.length chars = text.chars.to_a (chars.length > length ? chars[0...l].join('') + truncate_string : text).to_s end end |