Class: ElephantDriver::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/elephant-driver/job.rb

Constant Summary collapse

STATES =
[ :running, :succeeded, :failed, :prep, :killed ]

Instance Method Summary collapse

Constructor Details

#initialize(cln, thrift_job) ⇒ Job

Returns a new instance of Job.



8
9
10
11
# File 'lib/elephant-driver/job.rb', line 8

def initialize(cln, thrift_job)
  @cln = cln
  @thrift_job = thrift_job
end

Instance Method Details

#cleanup_progressObject



49
50
51
# File 'lib/elephant-driver/job.rb', line 49

def cleanup_progress
  @thrift_job.status.cleanupProgress
end

#completed?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/elephant-driver/job.rb', line 21

def completed?
  state != :running
end

#config_paramsObject



61
62
63
64
65
66
67
68
# File 'lib/elephant-driver/job.rb', line 61

def config_params
  xml = call :getJobConfXML
  #@parsed_config ||= Nokogiri::XML(xml).xpath("//property").inject({}) { |props, xprop|
  #  props[xprop.xpath("./name").text] = xprop.xpath("./value").text
  #  props
  #}
  {}
end

#countersObject



70
71
72
73
74
75
76
77
78
79
# File 'lib/elephant-driver/job.rb', line 70

def counters
  counters = {}
  ret = call :getJobCounters
  ret.groups.each { |g|
    h = {}
    g.counters.each { |name, c| h[name] = c.value }
    counters[g.name] = h
  }
  counters
end

#finish_timeObject



33
34
35
# File 'lib/elephant-driver/job.rb', line 33

def finish_time
  @thrift_job.finishTime
end

#job_idObject



13
14
15
# File 'lib/elephant-driver/job.rb', line 13

def job_id
  @thrift_job.jobID
end

#kill!Object



117
118
119
# File 'lib/elephant-driver/job.rb', line 117

def kill!
  call :killJob
end

#launch_timeObject



29
30
31
# File 'lib/elephant-driver/job.rb', line 29

def launch_time
  @thrift_job.launchTime
end

#map_progressObject



41
42
43
# File 'lib/elephant-driver/job.rb', line 41

def map_progress
  @thrift_job.status.mapProgress
end

#progressObject



57
58
59
# File 'lib/elephant-driver/job.rb', line 57

def progress
  (@thrift_job.status.mapProgress + @thrift_job.status.reduceProgress) / 2.0
end

#reduce_progressObject



45
46
47
# File 'lib/elephant-driver/job.rb', line 45

def reduce_progress
  @thrift_job.status.reduceProgress
end

#set_priority(priority) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/elephant-driver/job.rb', line 104

def set_priority(priority)
  prio =
    case priority
    when :very_high then Hadoop::API::Jobtracker::ThriftJobPriority::VERY_HIGH
    when :high      then Hadoop::API::Jobtracker::ThriftJobPriority::HIGH
    when :normal    then Hadoop::API::Jobtracker::ThriftJobPriority::NORMAL
    when :low       then Hadoop::API::Jobtracker::ThriftJobPriority::LOW
    when :very_low  then Hadoop::API::Jobtracker::ThriftJobPriority::VERY_LOW
    else Hadoop::API::Jobtracker::ThriftJobPriority::NORMAL
    end
  call :setJobPriority, prio
end

#setup_progressObject



53
54
55
# File 'lib/elephant-driver/job.rb', line 53

def setup_progress
  @thrift_job.status.setupProgress
end

#start_timeObject



25
26
27
# File 'lib/elephant-driver/job.rb', line 25

def start_time
  @thrift_job.startTime
end

#stateObject



37
38
39
# File 'lib/elephant-driver/job.rb', line 37

def state
  STATES[@thrift_job.status.runState - 1]
end

#tasksObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/elephant-driver/job.rb', line 81

def tasks
  types = [
    Hadoop::API::Jobtracker::ThriftTaskType::MAP,
    Hadoop::API::Jobtracker::ThriftTaskType::REDUCE,
    Hadoop::API::Jobtracker::ThriftTaskType::JOB_SETUP,
    Hadoop::API::Jobtracker::ThriftTaskType::JOB_CLEANUP,
    Hadoop::API::Jobtracker::ThriftTaskType::TASK_CLEANUP,
  ]
  states = [
    Hadoop::API::Jobtracker::ThriftTaskState::RUNNING,
    Hadoop::API::Jobtracker::ThriftTaskState::SUCCEEDED,
    Hadoop::API::Jobtracker::ThriftTaskState::FAILED,
    Hadoop::API::Jobtracker::ThriftTaskState::UNASSIGNED,
    Hadoop::API::Jobtracker::ThriftTaskState::KILLED,
    Hadoop::API::Jobtracker::ThriftTaskState::COMMIT_PENDING,
    Hadoop::API::Jobtracker::ThriftTaskState::FAILED_UNCLEAN,
    Hadoop::API::Jobtracker::ThriftTaskState::KILLED_UNCLEAN,
  ]

  tasks = (call :getTaskList, types, states, '', 10000, 0).tasks
  return tasks.collect{ |t| Task.new(self, t) }
end

#userObject



17
18
19
# File 'lib/elephant-driver/job.rb', line 17

def user
  @thrift_job.status.user
end