Class: SimpleMapReduce::Server::JobTracker
- Inherits:
-
Sinatra::Base
- Object
- Sinatra::Base
- SimpleMapReduce::Server::JobTracker
- Defined in:
- lib/simple_map_reduce/server/job_tracker.rb
Constant Summary collapse
- POLLING_INTERVAL =
10
Class Attribute Summary collapse
-
.config ⇒ Object
Returns the value of attribute config.
-
.jobs ⇒ Object
readonly
Returns the value of attribute jobs.
-
.workers ⇒ Object
readonly
Returns the value of attribute workers.
Class Method Summary collapse
- .check_s3_access ⇒ Object
- .create_s3_buckets_if_not_existing ⇒ Object
- .fetch_available_workers(worker_size = 1) ⇒ Object
- .job_manager ⇒ Object
- .logger ⇒ Object
- .mutex ⇒ Object
- .quit! ⇒ Object
- .register_job(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker:) ⇒ Object
- .register_worker(url:) ⇒ Object
- .s3_client ⇒ Object
- .setup_job_tracker ⇒ Object
- .start_polling_workers ⇒ Object
- .store_worker(worker) ⇒ Object
Class Attribute Details
.config ⇒ Object
Returns the value of attribute config.
168 169 170 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 168 def config @config end |
.jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
169 170 171 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 169 def jobs @jobs end |
.workers ⇒ Object (readonly)
Returns the value of attribute workers.
170 171 172 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 170 def workers @workers end |
Class Method Details
.check_s3_access ⇒ Object
181 182 183 184 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 181 def check_s3_access s3_client.list_buckets logger.info('[OK] s3 connection test') end |
.create_s3_buckets_if_not_existing ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 186 def create_s3_buckets_if_not_existing current_bucket_names = s3_client.list_buckets.buckets.map(&:name) unless current_bucket_names.include?(SimpleMapReduce.s3_input_bucket_name) s3_client.create_bucket(bucket: SimpleMapReduce.s3_input_bucket_name) logger.info("create bucket #{SimpleMapReduce.s3_input_bucket_name}") end unless current_bucket_names.include?(SimpleMapReduce.s3_intermediate_bucket_name) s3_client.create_bucket(bucket: SimpleMapReduce.s3_intermediate_bucket_name) logger.info("create bucket #{SimpleMapReduce.s3_intermediate_bucket_name}") end unless current_bucket_names.include?(SimpleMapReduce.s3_output_bucket_name) s3_client.create_bucket(bucket: SimpleMapReduce.s3_output_bucket_name) logger.info("create bucket #{SimpleMapReduce.s3_output_bucket_name}") end logger.info('[OK] confirmed that all necessary s3 buckets exist') end |
.fetch_available_workers(worker_size = 1) ⇒ Object
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 247 def fetch_available_workers(worker_size = 1) mutex.lock if @workers.nil? || @workers.empty? return [] end ready_workers = @workers.select { |_id, worker| worker.ready? } if ready_workers.count > 0 ready_workers = ready_workers.keys.take(worker_size) ready_workers.map do |retry_worker_id| @workers[retry_worker_id].reserve! rescue => e logger.error("Failed to transit the worker state: `#{@workers[retry_worker_id]}`") logger.error(e.inspect) nil else @workers[retry_worker_id] end.compact else [] end ensure mutex.unlock end |
.job_manager ⇒ Object
302 303 304 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 302 def job_manager @job_manager ||= ::Rasteira::EmbedWorker::Manager.run end |
.logger ⇒ Object
314 315 316 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 314 def logger SimpleMapReduce.logger end |
.mutex ⇒ Object
306 307 308 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 306 def mutex @mutex ||= Mutex.new end |
.quit! ⇒ Object
320 321 322 323 324 325 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 320 def quit! @keep_polling_workers = false @polling_workers_thread&.kill job_manager&.shutdown_workers! super end |
.register_job(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker:) ⇒ Object
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 205 def register_job(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker:) job = ::SimpleMapReduce::Server::Job.new( map_script: map_script, map_class_name: map_class_name, reduce_script: reduce_script, reduce_class_name: reduce_class_name, job_input_directory_path: job_input_directory_path, job_input_bucket_name: job_input_bucket_name, job_output_bucket_name: job_output_bucket_name, job_output_directory_path: job_output_directory_path, map_worker: map_worker ) if @jobs.nil? @jobs = {} end # enqueue job job_manager.enqueue_job!(SimpleMapReduce::Worker::RegisterMapTaskWorker, args: job) @jobs[job.id] = job job end |
.register_worker(url:) ⇒ Object
237 238 239 240 241 242 243 244 245 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 237 def register_worker(url:) worker = ::SimpleMapReduce::Server::Worker.new(url: url, data_store_type: 'remote') if @workers.nil? @workers = {} end @workers[worker.id] = worker worker end |
.s3_client ⇒ Object
310 311 312 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 310 def s3_client SimpleMapReduce::S3Client.instance.client end |
.setup_job_tracker ⇒ Object
172 173 174 175 176 177 178 179 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 172 def setup_job_tracker check_s3_access create_s3_buckets_if_not_existing job_manager start_polling_workers logger.info('All setup process is done successfully. The job tracker is operation ready.') logger.info("This job tracker url: #{SimpleMapReduce.job_tracker_url}") end |
.start_polling_workers ⇒ Object
288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 288 def start_polling_workers @keep_polling_workers = true @polling_workers_thread = Thread.new do loop do break unless @keep_polling_workers job_manager.enqueue_job!(SimpleMapReduce::Worker::PollingWorkersStatusWorker, args: @workers || {}) sleep(POLLING_INTERVAL) end end @polling_workers_thread.run end |
.store_worker(worker) ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/simple_map_reduce/server/job_tracker.rb', line 274 def store_worker(worker) mutex.lock if @workers.nil? @workers = {} end @workers[worker.id].ready! ensure mutex.unlock end |