Class: ElephantDriver::Job
- Inherits:
-
Object
- Object
- ElephantDriver::Job
- Defined in:
- lib/elephant-driver/job.rb
Constant Summary collapse
- STATES =
[ :running, :succeeded, :failed, :prep, :killed ]
Instance Method Summary collapse
- #cleanup_progress ⇒ Object
- #completed? ⇒ Boolean
- #config_params ⇒ Object
- #counters ⇒ Object
- #finish_time ⇒ Object
-
#initialize(cln, thrift_job) ⇒ Job
constructor
A new instance of Job.
- #job_id ⇒ Object
- #kill! ⇒ Object
- #launch_time ⇒ Object
- #map_progress ⇒ Object
- #progress ⇒ Object
- #reduce_progress ⇒ Object
- #set_priority(priority) ⇒ Object
- #setup_progress ⇒ Object
- #start_time ⇒ Object
- #state ⇒ Object
- #tasks ⇒ Object
- #user ⇒ Object
Constructor Details
#initialize(cln, thrift_job) ⇒ Job
Returns a new instance of Job.
8 9 10 11 |
# File 'lib/elephant-driver/job.rb', line 8 def initialize(cln, thrift_job) @cln = cln @thrift_job = thrift_job end |
Instance Method Details
#cleanup_progress ⇒ Object
49 50 51 |
# File 'lib/elephant-driver/job.rb', line 49 def cleanup_progress @thrift_job.status.cleanupProgress end |
#completed? ⇒ Boolean
21 22 23 |
# File 'lib/elephant-driver/job.rb', line 21 def completed? state != :running end |
#config_params ⇒ Object
61 62 63 64 65 66 67 68 |
# File 'lib/elephant-driver/job.rb', line 61 def config_params xml = call :getJobConfXML #@parsed_config ||= Nokogiri::XML(xml).xpath("//property").inject({}) { |props, xprop| # props[xprop.xpath("./name").text] = xprop.xpath("./value").text # props #} {} end |
#counters ⇒ Object
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/elephant-driver/job.rb', line 70 def counters counters = {} ret = call :getJobCounters ret.groups.each { |g| h = {} g.counters.each { |name, c| h[name] = c.value } counters[g.name] = h } counters end |
#finish_time ⇒ Object
33 34 35 |
# File 'lib/elephant-driver/job.rb', line 33 def finish_time @thrift_job.finishTime end |
#job_id ⇒ Object
13 14 15 |
# File 'lib/elephant-driver/job.rb', line 13 def job_id @thrift_job.jobID end |
#kill! ⇒ Object
117 118 119 |
# File 'lib/elephant-driver/job.rb', line 117 def kill! call :killJob end |
#launch_time ⇒ Object
29 30 31 |
# File 'lib/elephant-driver/job.rb', line 29 def launch_time @thrift_job.launchTime end |
#map_progress ⇒ Object
41 42 43 |
# File 'lib/elephant-driver/job.rb', line 41 def map_progress @thrift_job.status.mapProgress end |
#progress ⇒ Object
57 58 59 |
# File 'lib/elephant-driver/job.rb', line 57 def progress (@thrift_job.status.mapProgress + @thrift_job.status.reduceProgress) / 2.0 end |
#reduce_progress ⇒ Object
45 46 47 |
# File 'lib/elephant-driver/job.rb', line 45 def reduce_progress @thrift_job.status.reduceProgress end |
#set_priority(priority) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/elephant-driver/job.rb', line 104 def set_priority(priority) prio = case priority when :very_high then Hadoop::API::Jobtracker::ThriftJobPriority::VERY_HIGH when :high then Hadoop::API::Jobtracker::ThriftJobPriority::HIGH when :normal then Hadoop::API::Jobtracker::ThriftJobPriority::NORMAL when :low then Hadoop::API::Jobtracker::ThriftJobPriority::LOW when :very_low then Hadoop::API::Jobtracker::ThriftJobPriority::VERY_LOW else Hadoop::API::Jobtracker::ThriftJobPriority::NORMAL end call :setJobPriority, prio end |
#setup_progress ⇒ Object
53 54 55 |
# File 'lib/elephant-driver/job.rb', line 53 def setup_progress @thrift_job.status.setupProgress end |
#start_time ⇒ Object
25 26 27 |
# File 'lib/elephant-driver/job.rb', line 25 def start_time @thrift_job.startTime end |
#state ⇒ Object
37 38 39 |
# File 'lib/elephant-driver/job.rb', line 37 def state STATES[@thrift_job.status.runState - 1] end |
#tasks ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/elephant-driver/job.rb', line 81 def tasks types = [ Hadoop::API::Jobtracker::ThriftTaskType::MAP, Hadoop::API::Jobtracker::ThriftTaskType::REDUCE, Hadoop::API::Jobtracker::ThriftTaskType::JOB_SETUP, Hadoop::API::Jobtracker::ThriftTaskType::JOB_CLEANUP, Hadoop::API::Jobtracker::ThriftTaskType::TASK_CLEANUP, ] states = [ Hadoop::API::Jobtracker::ThriftTaskState::RUNNING, Hadoop::API::Jobtracker::ThriftTaskState::SUCCEEDED, Hadoop::API::Jobtracker::ThriftTaskState::FAILED, Hadoop::API::Jobtracker::ThriftTaskState::UNASSIGNED, Hadoop::API::Jobtracker::ThriftTaskState::KILLED, Hadoop::API::Jobtracker::ThriftTaskState::COMMIT_PENDING, Hadoop::API::Jobtracker::ThriftTaskState::FAILED_UNCLEAN, Hadoop::API::Jobtracker::ThriftTaskState::KILLED_UNCLEAN, ] tasks = (call :getTaskList, types, states, '', 10000, 0).tasks return tasks.collect{ |t| Task.new(self, t) } end |
#user ⇒ Object
17 18 19 |
# File 'lib/elephant-driver/job.rb', line 17 def user @thrift_job.status.user end |