Class: ContainedMr::Job
- Inherits:
-
Object
- Object
- ContainedMr::Job
- Includes:
- JobLogic
- Defined in:
- lib/contained_mr/job.rb
Overview
A map-reduce job.
Instance Attribute Summary
Attributes included from JobLogic
#id, #item_count, #mapper_image_id, #name_prefix, #reducer_image_id, #template
Instance Method Summary collapse
-
#build_mapper_image(mapper_input) ⇒ String
Builds the Docker image used to run this job’s mappers.
-
#build_reducer_image ⇒ String
Builds the Docker image used to run this job’s reducer.
-
#destroy! ⇒ ContainedMr::Job
Tears down the job’s state.
-
#initialize(template, id, json_options) ⇒ Job
constructor
A new instance of Job.
-
#reducer_tar_context ⇒ IO
Builds the .tar context used to create the mapper’s Docker image.
-
#run_mapper(i) ⇒ ContainedMr::Runner
Runs one of the job’s mappers.
-
#run_reducer ⇒ ContainedMr::Runner
Runs one the job’s reducer.
Methods included from JobLogic
#mapper_container_options, #mapper_image_options, #mapper_image_tag, #mapper_runner, #reducer_container_options, #reducer_image_options, #reducer_image_tag, #reducer_runner
Constructor Details
#initialize(template, id, json_options) ⇒ Job
Returns a new instance of Job.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/contained_mr/job.rb', line 13 def initialize(template, id, ) @template = template @id = id @name_prefix = template.name_prefix @item_count = template.item_count @mapper_image_id = nil @reducer_image_id = nil @mappers = Array.new @item_count @reducer = nil @mapper_options = nil @reducer_options = nil end |
Instance Method Details
#build_mapper_image(mapper_input) ⇒ String
Builds the Docker image used to run this job’s mappers.
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/contained_mr/job.rb', line 62 def build_mapper_image(mapper_input) unless @mapper_image_id.nil? raise RuntimeError, 'Mapper image already exists' end tar_io = mapper_tar_context mapper_input Docker::Image.build_from_tar tar_io, # NOTE: The build process returns a short image ID. We need to perform # another API call to get the canonical ID. @mapper_image_id = Docker::Image.get(mapper_image_tag).id end |
#build_reducer_image ⇒ String
Builds the Docker image used to run this job’s reducer.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/contained_mr/job.rb', line 78 def build_reducer_image unless @reducer_image_id.nil? raise RuntimeError, 'Reducer image already exists' end 1.upto @item_count do |i| raise RuntimeError, 'Not all mappers ran' if mapper_runner(i).nil? end tar_io = reducer_tar_context Docker::Image.build_from_tar tar_io, # NOTE: The build process returns a short image ID. We need to perform # another API call to get the canonical ID. @reducer_image_id = Docker::Image.get(reducer_image_tag).id end |
#destroy! ⇒ ContainedMr::Job
Tears down the job’s state.
This removes the job’s containers, as well as the mapper and reducer Docker images, if they still exist.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/contained_mr/job.rb', line 35 def destroy! @mappers.each do |mapper| mapper.destroy! unless mapper.nil? end @reducer.destroy! unless @reducer.nil? unless @mapper_image_id.nil? # HACK(pwnall): Trick docker-api into issuing a DELETE request by tag. image = Docker::Image.new Docker.connection, 'id' => mapper_image_tag image.remove @mapper_image_id = nil end unless @reducer_image_id.nil? # HACK(pwnall): Trick docker-api into issuing a DELETE request by tag. image = Docker::Image.new Docker.connection, 'id' => reducer_image_tag image.remove @reducer_image_id = nil end self end |
#reducer_tar_context ⇒ IO
Builds the .tar context used to create the mapper’s Docker image.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/contained_mr/job.rb', line 128 def reducer_tar_context tar_buffer = StringIO.new Gem::Package::TarWriter.new tar_buffer do |tar| tar.add_file 'Dockerfile', 0644 do |docker_io| docker_io.write @template.reducer_dockerfile end @mappers.each_with_index do |mapper, index| i = index + 1 if mapper.output tar.add_file "#{i}.out", 0644 do |io| io.write mapper.output end end tar.add_file("#{i}.stdout", 0644) { |io| io.write mapper.stdout } tar.add_file("#{i}.stderr", 0644) { |io| io.write mapper.stderr } tar.add_file("#{i}.json", 0644) do |io| io.write mapper.json_file.to_json end end end tar_buffer.rewind tar_buffer end |
#run_mapper(i) ⇒ ContainedMr::Runner
Runs one of the job’s mappers.
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/contained_mr/job.rb', line 98 def run_mapper(i) if i < 1 || i > @item_count raise ArgumentError, "Invalid mapper number #{i}" end raise RuntimeError, 'Mapper image does not exist' if @mapper_image_id.nil? mapper = ContainedMr::Runner.new (i), @mapper_options[:wait_time], @template.mapper_output_path @mappers[i - 1] = mapper mapper.perform end |
#run_reducer ⇒ ContainedMr::Runner
Runs one the job’s reducer.
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/contained_mr/job.rb', line 113 def run_reducer if @reducer_image_id.nil? raise RuntimeError, 'Reducer image does not exist' end reducer = ContainedMr::Runner.new , @reducer_options[:wait_time], @template.reducer_output_path @reducer = reducer @reducer.perform end |