Class: Mandy::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/mandy/job.rb

Constant Summary collapse

JSON_PAYLOAD_KEY =
"json"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, &blk) ⇒ Job

Returns a new instance of Job.



35
36
37
38
39
40
41
42
43
# File 'lib/mandy/job.rb', line 35

def initialize(name, &blk)
  @name = name
  @settings = {}
  @modules = []
  @map, @reduce = nil, nil
  set('mapred.job.name', name)
  instance_eval(&blk) if blk
  auto_set_reduce_count
end

Instance Attribute Details

#input_format_optionsObject (readonly)

Returns the value of attribute input_format_options.



33
34
35
# File 'lib/mandy/job.rb', line 33

def input_format_options
  @input_format_options
end

#nameObject (readonly)

Returns the value of attribute name.



32
33
34
# File 'lib/mandy/job.rb', line 32

def name
  @name
end

#settingsObject (readonly)

Returns the value of attribute settings.



31
32
33
# File 'lib/mandy/job.rb', line 31

def settings
  @settings
end

Class Method Details

.find_by_name(name) ⇒ Object



10
11
12
# File 'lib/mandy/job.rb', line 10

def find_by_name(name)
  jobs.find {|job| job.name == name }
end

.jobsObject



6
7
8
# File 'lib/mandy/job.rb', line 6

def jobs
  @jobs ||= []
end

.parameter(name) ⇒ Object



14
15
16
17
# File 'lib/mandy/job.rb', line 14

def parameter(name)
  param = find_json_param(name) if json_provided?
  param || ENV[name.to_s]
end

Instance Method Details

#input_format(format = nil, options = {}) ⇒ Object



50
51
52
53
54
55
# File 'lib/mandy/job.rb', line 50

def input_format(format=nil, options={})
  return @input_format if format.nil?
  
  @input_format = format
  @input_format_options = options
end

#map(klass = nil, &blk) ⇒ Object



90
91
92
# File 'lib/mandy/job.rb', line 90

def map(klass=nil, &blk)
  @map = klass || blk
end

#map_tasks(count) ⇒ Object



65
66
67
# File 'lib/mandy/job.rb', line 65

def map_tasks(count)
  set('mapred.map.tasks', count)
end

#mixin(*modules) ⇒ Object Also known as: serialize



45
46
47
# File 'lib/mandy/job.rb', line 45

def mixin(*modules)
  modules.each {|m| @modules << m}
end

#output_format(format) ⇒ Object



57
58
59
# File 'lib/mandy/job.rb', line 57

def output_format(format)
  @output_format = format
end

#reduce(klass = nil, &blk) ⇒ Object



94
95
96
# File 'lib/mandy/job.rb', line 94

def reduce(klass=nil, &blk)
  @reduce = klass || blk
end

#reduce_tasks(count) ⇒ Object



69
70
71
# File 'lib/mandy/job.rb', line 69

def reduce_tasks(count)
  set('mapred.reduce.tasks', count)
end

#reducer_defined?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/mandy/job.rb', line 111

def reducer_defined?
  !@reduce.nil?
end

#run_map(input = STDIN, output = STDOUT) {|mapper| ... } ⇒ Object

Yields:

  • (mapper)


98
99
100
101
102
103
# File 'lib/mandy/job.rb', line 98

def run_map(input=STDIN, output=STDOUT, &blk)
  mapper_class.send(:include, Mandy::IO::OutputFormatting) unless reducer_defined?
  mapper = mapper_class.new(input, output, @input_format, @output_format)
  yield(mapper) if blk
  mapper.execute
end

#run_reduce(input = STDIN, output = STDOUT) {|reducer| ... } ⇒ Object

Yields:

  • (reducer)


105
106
107
108
109
# File 'lib/mandy/job.rb', line 105

def run_reduce(input=STDIN, output=STDOUT, &blk)
  reducer = reducer_class.new(input, output, @input_format, @output_format)
  yield(reducer) if blk
  reducer.execute
end

#set(key, value) ⇒ Object



61
62
63
# File 'lib/mandy/job.rb', line 61

def set(key, value)
  @settings[key.to_s] = value.to_s
end

#setup(&blk) ⇒ Object



82
83
84
# File 'lib/mandy/job.rb', line 82

def setup(&blk)
  @setup = blk
end

#store(type, name, options = {}) ⇒ Object



73
74
75
76
77
78
79
80
# File 'lib/mandy/job.rb', line 73

def store(type, name, options={})
  Mandy.stores[name] = case type
  when :hbase
    Stores::HBase.new(options)
  else
    raise "Unknown store type #{type}"
  end
end

#teardown(&blk) ⇒ Object



86
87
88
# File 'lib/mandy/job.rb', line 86

def teardown(&blk)
  @teardown = blk
end