Class: DistributedResqueWorker::ResqueWorker

Inherits:
Object
  • Object
show all
Extended by:
ResqueFailure
Defined in:
lib/distributed_resque_worker.rb

Overview

ResqueWorker

Constant Summary collapse

CHUNK_SIZE =
100

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ResqueFailure

on_failure_logging

Constructor Details

#initialize(queue_name, bucket, root) ⇒ ResqueWorker

Returns a new instance of ResqueWorker.



25
26
27
28
29
30
# File 'lib/distributed_resque_worker.rb', line 25

def initialize(queue_name, bucket, root)
  @queue = "#{queue_name}_#{Time.now.to_i}_#{Random.rand(1000000)}".to_sym
  @bucket = bucket
  @root = root
  FileUtils.mkdir_p("#{root}/tmp/#{@queue}")
end

Class Method Details

.clean_up(queue_name, root) ⇒ Object



164
165
166
167
168
169
# File 'lib/distributed_resque_worker.rb', line 164

def clean_up(queue_name, root)
  FileUtils.remove_dir("#{root}/tmp/#{queue_name}")

  delete_queue(queue_name)
  Resque.logger.info('Cleanup Done!')
end

.delete_intermediate_s3_files(work_name, bucket) ⇒ Object



142
143
144
145
146
147
148
149
150
# File 'lib/distributed_resque_worker.rb', line 142

def delete_intermediate_s3_files(work_name, bucket)
  aws_bucket = AwsHelper.bucket(bucket)
  folder = "resque_worker/#{work_name}/"
  s3_object = aws_bucket.objects.with_prefix(folder)
  s3_file_names = s3_object.collect(&:key)
  s3_file_names.each do |item|
    AwsHelper.s3_delete(item, bucket)
  end
end

.delete_queue(queue_name) ⇒ Object



171
172
173
174
175
# File 'lib/distributed_resque_worker.rb', line 171

def delete_queue(queue_name)
  Resque.queues.each do |queue|
    Resque.remove_queue(queue) if queue == queue_name
  end
end

.download_intermediate_files(work_name, bucket, root) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/distributed_resque_worker.rb', line 127

def download_intermediate_files(work_name, bucket, root)
  aws_bucket = AwsHelper.bucket(bucket)
  folder = "resque_worker/#{work_name}/"
  s3_object = aws_bucket.objects.with_prefix(folder)
  s3_file_names = s3_object.collect(&:key)
  s3_file_names.each do |filename|
    local_file_name = filename.split('/')
    next unless local_file_name[2]

    download_file_path = "#{root}/tmp/#{work_name}/#{local_file_name[2]}"
    Resque.logger.info("download_file_path #{download_file_path} ")
    AwsHelper.s3_download_file(filename, download_file_path, bucket)
  end
end

.enqueue_post_processor(args) ⇒ Object



76
77
78
79
80
81
82
83
84
# File 'lib/distributed_resque_worker.rb', line 76

def enqueue_post_processor(args)
  Resque.logger.info('start enqueue_post_processor')
  input = args.symbolize_keys!
  details = { type: 'post_processor', work_name: input[:work_name],
              bucket: input[:bucket], method: input[:method],
              root: input[:root], opts: input[:opts] }
  Resque.enqueue_to(input[:work_name], ResqueWorker, details)
  Resque.logger.info('finished enqueue_post_processor')
end

.merge_intermediate_files(work_name, final_file) ⇒ Object



70
71
72
73
74
# File 'lib/distributed_resque_worker.rb', line 70

def merge_intermediate_files(work_name, final_file)
  files = "tmp/#{work_name}/#{work_name}_*.csv"
  cmd = "awk '(NR == 1) || (FNR > 1)' #{files} > #{final_file}"
  system(`#{cmd}`)
end

.perform(args) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/distributed_resque_worker.rb', line 47

def self.perform(args)
  redis_key = resque_worker_redis_key(args['work_name'])
  no_jobs = resque_redis.redis.get(redis_key)
  Resque.logger.info("No of jobs remaining => #{no_jobs}")
  if args['type'] == 'processor'
    process_chunk(args)
    if resque_redis.redis.get(redis_key).to_i.zero?
      enqueue_post_processor(args)
    end
  elsif args['type'] == 'post_processor'
    post_processing(args)
  end
end

.post_processing(args) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/distributed_resque_worker.rb', line 109

def post_processing(args)
  input = args.symbolize_keys!
  work_name = input[:work_name]
  root = input[:root]
  final_tmp_file = "#{root}/tmp/#{work_name}/#{work_name}_final.csv"
  Resque.logger.info("start post_processing #{input}")
  begin
    download_intermediate_files(work_name, input[:bucket], root)
    delete_intermediate_s3_files(work_name, input[:bucket])
    merge_intermediate_files(work_name, final_tmp_file)
    upload_final_file_to_s3_and_send(input, final_tmp_file)
  rescue StandardError
    Resque.logger.error($ERROR_INFO)
    nil
  end
  Resque.logger.info('finished post_processing ')
end

.process_chunk(args) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/distributed_resque_worker.rb', line 86

def process_chunk(args)
  input = args.symbolize_keys!
  method_chunk = "#{input[:method]}_chunk".to_sym
  worker_class = input[:work_name].split('_').first
  worker = worker_class.constantize
  path = "#{input[:work_name]}/#{input[:work_name]}_#{input[:index]}.csv"
  filename = "#{input[:root]}/tmp/#{path}"
  worker.send(method_chunk, input[:chunk], filename, input[:opts])
  store_to_s3_delete_local_copy(path, filename, input[:bucket])
  resque_redis.redis.decr(resque_worker_redis_key(input[:work_name]))
end

.resque_redisObject



62
63
64
# File 'lib/distributed_resque_worker.rb', line 62

def resque_redis
  Resque.redis
end

.resque_worker_redis_key(work_name) ⇒ Object



66
67
68
# File 'lib/distributed_resque_worker.rb', line 66

def resque_worker_redis_key(work_name)
  "DistributedResqueWorker:#{work_name}"
end

.store_to_s3_delete_local_copy(path, filename, bucket) ⇒ Object



98
99
100
101
102
103
104
105
106
107
# File 'lib/distributed_resque_worker.rb', line 98

def store_to_s3_delete_local_copy(path, filename, bucket)
  s3_name = "resque_worker/#{path}"
  begin
    AwsHelper.s3_store_file(s3_name, filename, bucket)
    File.delete(filename)
  rescue StandardError
    Resque.logger.error($ERROR_INFO)
    nil
  end
end

.upload_final_file_to_s3_and_send(input, final_tmp_file) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
# File 'lib/distributed_resque_worker.rb', line 152

def upload_final_file_to_s3_and_send(input, final_tmp_file)
  work_name = input[:work_name]
  s3_name = "resque_worker/#{work_name}/#{work_name}_final.csv"
  final_file_link = AwsHelper.s3_store_file(s3_name, final_tmp_file,
                                            input[:bucket])
  method_post = "#{input[:method]}_post".to_sym
  worker_class = input[:work_name].split('_').first
  worker = worker_class.constantize
  worker.send(method_post, final_file_link, input[:opts])
  clean_up(work_name, input[:root])
end

Instance Method Details

#chunk_work_and_enqueue(work_list, method, opts) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/distributed_resque_worker.rb', line 32

def chunk_work_and_enqueue(work_list, method, opts)
  total_jobs = (work_list.length.to_f / CHUNK_SIZE).ceil
  total_jobs = 1 if total_jobs.zero?
  Resque.logger.info("total_jobs #{total_jobs}")
  ResqueWorker.resque_redis.redis.set(
    ResqueWorker.resque_worker_redis_key(@queue), total_jobs
  )
  work_list.each_slice(CHUNK_SIZE).each_with_index do |chunk, index|
    details = { work_name: @queue, chunk: chunk, index: index,
                type: 'processor', root: @root.to_s,
                bucket: @bucket, method: method, opts: opts }
    Resque.enqueue_to(@queue, ResqueWorker, details)
  end
end