Class: ContainedMr::Job

Inherits:
Object
  • Object
show all
Includes:
JobLogic
Defined in:
lib/contained_mr/job.rb

Overview

A map-reduce job.

Instance Attribute Summary

Attributes included from JobLogic

#id, #item_count, #mapper_image_id, #name_prefix, #reducer_image_id, #template

Instance Method Summary collapse

Methods included from JobLogic

#mapper_container_options, #mapper_image_options, #mapper_image_tag, #mapper_runner, #reducer_container_options, #reducer_image_options, #reducer_image_tag, #reducer_runner

Constructor Details

#initialize(template, id, json_options) ⇒ Job

Returns a new instance of Job.

See Also:

  • ContainedMr::Job.{ContainedMr{ContainedMr::TemplateLogic{ContainedMr::TemplateLogic#new_job}


13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/contained_mr/job.rb', line 13

def initialize(template, id, json_options)
  @template = template
  @id = id
  @name_prefix = template.name_prefix
  @item_count = template.item_count

  @mapper_image_id = nil
  @reducer_image_id = nil

  @mappers = Array.new @item_count
  @reducer = nil
  @mapper_options = nil
  @reducer_options = nil
  parse_options json_options
end

Instance Method Details

#build_mapper_image(mapper_input) ⇒ String

Builds the Docker image used to run this job’s mappers.

Parameters:

  • mapper_input (String)

    data passed to the mappers

Returns:

  • (String)

    the newly built Docker image’s ID



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/contained_mr/job.rb', line 62

def build_mapper_image(mapper_input)
  unless @mapper_image_id.nil?
    raise RuntimeError, 'Mapper image already exists'
  end

  tar_io = mapper_tar_context mapper_input
  Docker::Image.build_from_tar tar_io, mapper_image_options

  # NOTE: The build process returns a short image ID. We need to perform
  #       another API call to get the canonical ID.
  @mapper_image_id = Docker::Image.get(mapper_image_tag).id
end

#build_reducer_imageString

Builds the Docker image used to run this job’s reducer.

Returns:

  • (String)

    the newly built Docker image’s ID



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/contained_mr/job.rb', line 78

def build_reducer_image
  unless @reducer_image_id.nil?
    raise RuntimeError, 'Reducer image already exists'
  end
  1.upto @item_count do |i|
    raise RuntimeError, 'Not all mappers ran' if mapper_runner(i).nil?
  end

  tar_io = reducer_tar_context
  Docker::Image.build_from_tar tar_io, reducer_image_options

  # NOTE: The build process returns a short image ID. We need to perform
  #       another API call to get the canonical ID.
  @reducer_image_id = Docker::Image.get(reducer_image_tag).id
end

#destroy!ContainedMr::Job

Tears down the job’s state.

This removes the job’s containers, as well as the mapper and reducer Docker images, if they still exist.

Returns:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/contained_mr/job.rb', line 35

def destroy!
  @mappers.each do |mapper|
    mapper.destroy! unless mapper.nil?
  end
  @reducer.destroy! unless @reducer.nil?

  unless @mapper_image_id.nil?
    # HACK(pwnall): Trick docker-api into issuing a DELETE request by tag.
    image = Docker::Image.new Docker.connection, 'id' => mapper_image_tag
    image.remove
    @mapper_image_id = nil
  end

  unless @reducer_image_id.nil?
    # HACK(pwnall): Trick docker-api into issuing a DELETE request by tag.
    image = Docker::Image.new Docker.connection, 'id' => reducer_image_tag
    image.remove
    @reducer_image_id = nil
  end

  self
end

#reducer_tar_contextIO

Builds the .tar context used to create the mapper’s Docker image.

Returns:

  • (IO)

    an IO implementation that sources the .tar file



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/contained_mr/job.rb', line 128

def reducer_tar_context
  tar_buffer = StringIO.new
  Gem::Package::TarWriter.new tar_buffer do |tar|
    tar.add_file 'Dockerfile', 0644 do |docker_io|
      docker_io.write @template.reducer_dockerfile
    end
    @mappers.each_with_index do |mapper, index|
      i = index + 1

      if mapper.output
        tar.add_file "#{i}.out", 0644 do |io|
          io.write mapper.output
        end
      end
      tar.add_file("#{i}.stdout", 0644) { |io| io.write mapper.stdout }
      tar.add_file("#{i}.stderr", 0644) { |io| io.write mapper.stderr }

      tar.add_file("#{i}.json", 0644) do |io|
        io.write mapper.json_file.to_json
      end
    end
  end
  tar_buffer.rewind
  tar_buffer
end

#run_mapper(i) ⇒ ContainedMr::Runner

Runs one of the job’s mappers.

Parameters:

  • i (Number)

    the mapper to run

Returns:

Raises:

  • (RuntimeError)


98
99
100
101
102
103
104
105
106
107
108
# File 'lib/contained_mr/job.rb', line 98

def run_mapper(i)
  if i < 1 || i > @item_count
    raise ArgumentError, "Invalid mapper number #{i}"
  end
  raise RuntimeError, 'Mapper image does not exist' if @mapper_image_id.nil?

  mapper = ContainedMr::Runner.new mapper_container_options(i),
      @mapper_options[:wait_time], @template.mapper_output_path
  @mappers[i - 1] = mapper
  mapper.perform
end

#run_reducerContainedMr::Runner

Runs one the job’s reducer.

Returns:



113
114
115
116
117
118
119
120
121
122
# File 'lib/contained_mr/job.rb', line 113

def run_reducer
  if @reducer_image_id.nil?
    raise RuntimeError, 'Reducer image does not exist'
  end

  reducer = ContainedMr::Runner.new reducer_container_options,
      @reducer_options[:wait_time], @template.reducer_output_path
  @reducer = reducer
  @reducer.perform
end