Class: DistributedResqueWorker::ResqueWorker
- Inherits:
-
Object
- Object
- DistributedResqueWorker::ResqueWorker
- Extended by:
- ResqueFailure
- Defined in:
- lib/distributed_resque_worker.rb
Overview
ResqueWorker
Constant Summary collapse
- CHUNK_SIZE =
100
Class Method Summary collapse
- .clean_up(queue_name, root) ⇒ Object
- .delete_intermediate_s3_files(work_name, bucket) ⇒ Object
- .delete_queue(queue_name) ⇒ Object
- .download_intermediate_files(work_name, bucket, root) ⇒ Object
- .enqueue_post_processor(args) ⇒ Object
- .merge_intermediate_files(work_name, final_file) ⇒ Object
- .perform(args) ⇒ Object
- .post_processing(args) ⇒ Object
- .process_chunk(args) ⇒ Object
- .resque_redis ⇒ Object
- .resque_worker_redis_key(work_name) ⇒ Object
- .store_to_s3_delete_local_copy(path, filename, bucket) ⇒ Object
- .upload_final_file_to_s3_and_send(input, final_tmp_file) ⇒ Object
Instance Method Summary collapse
- #chunk_work_and_enqueue(work_list, method, opts) ⇒ Object
-
#initialize(queue_name, bucket, root) ⇒ ResqueWorker
constructor
A new instance of ResqueWorker.
Methods included from ResqueFailure
Constructor Details
#initialize(queue_name, bucket, root) ⇒ ResqueWorker
Returns a new instance of ResqueWorker.
25 26 27 28 29 30 |
# File 'lib/distributed_resque_worker.rb', line 25 def initialize(queue_name, bucket, root) @queue = "#{queue_name}_#{Time.now.to_i}_#{Random.rand(1000000)}".to_sym @bucket = bucket @root = root FileUtils.mkdir_p("#{root}/tmp/#{@queue}") end |
Class Method Details
.clean_up(queue_name, root) ⇒ Object
164 165 166 167 168 169 |
# File 'lib/distributed_resque_worker.rb', line 164 def clean_up(queue_name, root) FileUtils.remove_dir("#{root}/tmp/#{queue_name}") delete_queue(queue_name) Resque.logger.info('Cleanup Done!') end |
.delete_intermediate_s3_files(work_name, bucket) ⇒ Object
142 143 144 145 146 147 148 149 150 |
# File 'lib/distributed_resque_worker.rb', line 142 def delete_intermediate_s3_files(work_name, bucket) aws_bucket = AwsHelper.bucket(bucket) folder = "resque_worker/#{work_name}/" s3_object = aws_bucket.objects.with_prefix(folder) s3_file_names = s3_object.collect(&:key) s3_file_names.each do |item| AwsHelper.s3_delete(item, bucket) end end |
.delete_queue(queue_name) ⇒ Object
171 172 173 174 175 |
# File 'lib/distributed_resque_worker.rb', line 171 def delete_queue(queue_name) Resque.queues.each do |queue| Resque.remove_queue(queue) if queue == queue_name end end |
.download_intermediate_files(work_name, bucket, root) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/distributed_resque_worker.rb', line 127 def download_intermediate_files(work_name, bucket, root) aws_bucket = AwsHelper.bucket(bucket) folder = "resque_worker/#{work_name}/" s3_object = aws_bucket.objects.with_prefix(folder) s3_file_names = s3_object.collect(&:key) s3_file_names.each do |filename| local_file_name = filename.split('/') next unless local_file_name[2] download_file_path = "#{root}/tmp/#{work_name}/#{local_file_name[2]}" Resque.logger.info("download_file_path #{download_file_path} ") AwsHelper.s3_download_file(filename, download_file_path, bucket) end end |
.enqueue_post_processor(args) ⇒ Object
76 77 78 79 80 81 82 83 84 |
# File 'lib/distributed_resque_worker.rb', line 76 def enqueue_post_processor(args) Resque.logger.info('start enqueue_post_processor') input = args.symbolize_keys! details = { type: 'post_processor', work_name: input[:work_name], bucket: input[:bucket], method: input[:method], root: input[:root], opts: input[:opts] } Resque.enqueue_to(input[:work_name], ResqueWorker, details) Resque.logger.info('finished enqueue_post_processor') end |
.merge_intermediate_files(work_name, final_file) ⇒ Object
70 71 72 73 74 |
# File 'lib/distributed_resque_worker.rb', line 70 def merge_intermediate_files(work_name, final_file) files = "tmp/#{work_name}/#{work_name}_*.csv" cmd = "awk '(NR == 1) || (FNR > 1)' #{files} > #{final_file}" system(`#{cmd}`) end |
.perform(args) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/distributed_resque_worker.rb', line 47 def self.perform(args) redis_key = resque_worker_redis_key(args['work_name']) no_jobs = resque_redis.redis.get(redis_key) Resque.logger.info("No of jobs remaining => #{no_jobs}") if args['type'] == 'processor' process_chunk(args) if resque_redis.redis.get(redis_key).to_i.zero? enqueue_post_processor(args) end elsif args['type'] == 'post_processor' post_processing(args) end end |
.post_processing(args) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/distributed_resque_worker.rb', line 109 def post_processing(args) input = args.symbolize_keys! work_name = input[:work_name] root = input[:root] final_tmp_file = "#{root}/tmp/#{work_name}/#{work_name}_final.csv" Resque.logger.info("start post_processing #{input}") begin download_intermediate_files(work_name, input[:bucket], root) delete_intermediate_s3_files(work_name, input[:bucket]) merge_intermediate_files(work_name, final_tmp_file) upload_final_file_to_s3_and_send(input, final_tmp_file) rescue StandardError Resque.logger.error($ERROR_INFO) nil end Resque.logger.info('finished post_processing ') end |
.process_chunk(args) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/distributed_resque_worker.rb', line 86 def process_chunk(args) input = args.symbolize_keys! method_chunk = "#{input[:method]}_chunk".to_sym worker_class = input[:work_name].split('_').first worker = worker_class.constantize path = "#{input[:work_name]}/#{input[:work_name]}_#{input[:index]}.csv" filename = "#{input[:root]}/tmp/#{path}" worker.send(method_chunk, input[:chunk], filename, input[:opts]) store_to_s3_delete_local_copy(path, filename, input[:bucket]) resque_redis.redis.decr(resque_worker_redis_key(input[:work_name])) end |
.resque_redis ⇒ Object
62 63 64 |
# File 'lib/distributed_resque_worker.rb', line 62 def resque_redis Resque.redis end |
.resque_worker_redis_key(work_name) ⇒ Object
66 67 68 |
# File 'lib/distributed_resque_worker.rb', line 66 def resque_worker_redis_key(work_name) "DistributedResqueWorker:#{work_name}" end |
.store_to_s3_delete_local_copy(path, filename, bucket) ⇒ Object
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/distributed_resque_worker.rb', line 98 def store_to_s3_delete_local_copy(path, filename, bucket) s3_name = "resque_worker/#{path}" begin AwsHelper.s3_store_file(s3_name, filename, bucket) File.delete(filename) rescue StandardError Resque.logger.error($ERROR_INFO) nil end end |
.upload_final_file_to_s3_and_send(input, final_tmp_file) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/distributed_resque_worker.rb', line 152 def upload_final_file_to_s3_and_send(input, final_tmp_file) work_name = input[:work_name] s3_name = "resque_worker/#{work_name}/#{work_name}_final.csv" final_file_link = AwsHelper.s3_store_file(s3_name, final_tmp_file, input[:bucket]) method_post = "#{input[:method]}_post".to_sym worker_class = input[:work_name].split('_').first worker = worker_class.constantize worker.send(method_post, final_file_link, input[:opts]) clean_up(work_name, input[:root]) end |
Instance Method Details
#chunk_work_and_enqueue(work_list, method, opts) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/distributed_resque_worker.rb', line 32 def chunk_work_and_enqueue(work_list, method, opts) total_jobs = (work_list.length.to_f / CHUNK_SIZE).ceil total_jobs = 1 if total_jobs.zero? Resque.logger.info("total_jobs #{total_jobs}") ResqueWorker.resque_redis.redis.set( ResqueWorker.resque_worker_redis_key(@queue), total_jobs ) work_list.each_slice(CHUNK_SIZE).each_with_index do |chunk, index| details = { work_name: @queue, chunk: chunk, index: index, type: 'processor', root: @root.to_s, bucket: @bucket, method: method, opts: opts } Resque.enqueue_to(@queue, ResqueWorker, details) end end |