Class: SimpleMapReduce::Server::JobTracker
- Inherits:
-
Sinatra::Base
- Object
- Sinatra::Base
- SimpleMapReduce::Server::JobTracker
- Defined in:
- lib/simple_map_reduce/server/job_tracker.rb
Class Attribute Summary collapse
-
.config ⇒ Object
Returns the value of attribute config.
-
.jobs ⇒ Object
readonly
Returns the value of attribute jobs.
-
.workers ⇒ Object
readonly
Returns the value of attribute workers.
Class Method Summary collapse
- .check_s3_access ⇒ Object
- .create_s3_buckets_if_not_existing ⇒ Object
- .fetch_available_workers(worker_size = 1) ⇒ Object
- .job_manager ⇒ Object
- .logger ⇒ Object
- .mutex ⇒ Object
- .quit! ⇒ Object
- .register_job(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker:) ⇒ Object
- .register_worker(url:) ⇒ Object
- .s3_client ⇒ Object
- .setup_job_tracker ⇒ Object
- .store_worker(worker) ⇒ Object
Class Attribute Details
.config ⇒ Object
Returns the value of attribute config.
168 169 170 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 168 def config @config end |
.jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
169 170 171 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 169 def jobs @jobs end |
.workers ⇒ Object (readonly)
Returns the value of attribute workers.
170 171 172 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 170 def workers @workers end |
Class Method Details
.check_s3_access ⇒ Object
180 181 182 183 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 180 def check_s3_access s3_client.list_buckets logger.info('[OK] s3 connection test') end |
.create_s3_buckets_if_not_existing ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 185 def create_s3_buckets_if_not_existing current_bucket_names = s3_client.list_buckets.buckets.map(&:name) unless current_bucket_names.include?(SimpleMapReduce.s3_input_bucket_name) s3_client.create_bucket(bucket: SimpleMapReduce.s3_input_bucket_name) logger.info("create bucket #{SimpleMapReduce.s3_input_bucket_name}") end unless current_bucket_names.include?(SimpleMapReduce.s3_intermediate_bucket_name) s3_client.create_bucket(bucket: SimpleMapReduce.s3_intermediate_bucket_name) logger.info("create bucket #{SimpleMapReduce.s3_intermediate_bucket_name}") end unless current_bucket_names.include?(SimpleMapReduce.s3_output_bucket_name) s3_client.create_bucket(bucket: SimpleMapReduce.s3_output_bucket_name) logger.info("create bucket #{SimpleMapReduce.s3_output_bucket_name}") end logger.info('[OK] confirmed that all necessary s3 buckets exist') end |
.fetch_available_workers(worker_size = 1) ⇒ Object
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 246 def fetch_available_workers(worker_size = 1) mutex.lock if @workers.nil? || @workers.empty? return [] end ready_workers = @workers.select { |_id, worker| worker.ready? } if ready_workers.count > 0 ready_workers = ready_workers.keys.take(worker_size) ready_workers.map do |retry_worker_id| @workers[retry_worker_id].reserve @workers[retry_worker_id] end else return [] end ensure mutex.unlock end |
.job_manager ⇒ Object
280 281 282 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 280 def job_manager @job_manager ||= ::Rasteira::EmbedWorker::Manager.run end |
.logger ⇒ Object
292 293 294 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 292 def logger SimpleMapReduce.logger end |
.mutex ⇒ Object
284 285 286 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 284 def mutex @mutex ||= Mutex.new end |
.quit! ⇒ Object
297 298 299 300 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 297 def quit! job_manager.shutdown_workers! super end |
.register_job(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker:) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 204 def register_job(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker:) job = ::SimpleMapReduce::Server::Job.new( map_script: map_script, map_class_name: map_class_name, reduce_script: reduce_script, reduce_class_name: reduce_class_name, job_input_directory_path: job_input_directory_path, job_input_bucket_name: job_input_bucket_name, job_output_bucket_name: job_output_bucket_name, job_output_directory_path: job_output_directory_path, map_worker: map_worker ) if @jobs.nil? @jobs = {} end # enqueue job job_manager.enqueue_job!(SimpleMapReduce::Worker::RegisterMapTaskWorker, args: job) @jobs[job.id] = job job end |
.register_worker(url:) ⇒ Object
236 237 238 239 240 241 242 243 244 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 236 def register_worker(url:) worker = ::SimpleMapReduce::Server::Worker.new(url: url) if @workers.nil? @workers = {} end @workers[worker.id] = worker worker end |
.s3_client ⇒ Object
288 289 290 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 288 def s3_client SimpleMapReduce::S3Client.instance.client end |
.setup_job_tracker ⇒ Object
172 173 174 175 176 177 178 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 172 def setup_job_tracker check_s3_access create_s3_buckets_if_not_existing job_manager logger.info('All setup process is done successfully. The job tracker is operation ready.') logger.info("This job tracker url: #{SimpleMapReduce.job_tracker_url}") end |
.store_worker(worker) ⇒ Object
268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 268 def store_worker(worker) mutex.lock if @workers.nil? @workers = {} end @workers[worker.id].ready! ensure mutex.unlock end |