Class: Chimp::ExecutionGroup
- Inherits:
-
Object
- Object
- Chimp::ExecutionGroup
- Defined in:
- lib/right_chimp/queue/ExecutionGroup.rb
Overview
An ExecutionGroup contains a set of Executors to be processed
Only the subclasses SerialExecutionGroup and ParallelExecutionGroup should be used directly.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
Returns the value of attribute concurrency.
-
#description ⇒ Object
Returns the value of attribute description.
-
#group_id ⇒ Object
Returns the value of attribute group_id.
-
#time_end ⇒ Object
readonly
Returns the value of attribute time_end.
-
#time_start ⇒ Object
readonly
Returns the value of attribute time_start.
Instance Method Summary collapse
-
#cancel(id) ⇒ Object
Cancel a job by id.
-
#done? ⇒ Boolean
An execution group is “done” if nothing is queued or running and at least one job has completed.
-
#get_job(i) ⇒ Object
Get a particular job.
-
#get_job_ids ⇒ Object
Get all job ids.
-
#get_jobs ⇒ Object
Get all jobs.
-
#get_jobs_by_status(status) ⇒ Object
Get jobs by status.
-
#get_total_exec_time ⇒ Object
Return total execution time.
-
#initialize(new_group_id = nil) ⇒ ExecutionGroup
constructor
A new instance of ExecutionGroup.
- #job_completed ⇒ Object
-
#push(j) ⇒ Object
Add something to the work queue.
-
#queue(id) ⇒ Object
Queue a held job by id.
-
#ready? ⇒ Boolean
An execution group is “ready” if it has work that can be done; see implementation in child classes.
-
#requeue(id) ⇒ Object
Requeue a job by id.
-
#requeue_failed_jobs! ⇒ Object
Requeue all failed jobs.
-
#reset! ⇒ Object
Reset the queue.
-
#results ⇒ Object
Return a hash of the results.
-
#running? ⇒ Boolean
Is this execution group running anything?.
-
#set_jobs(jobs = []) ⇒ Object
Reset all jobs and bulk set them.
-
#shift ⇒ Object
Take something from the queue.
-
#size ⇒ Object
Size of the active queue.
-
#sort! ⇒ Object
Sort queue by server nickname.
-
#to_s ⇒ Object
Print out ExecutionGroup information.
Constructor Details
#initialize(new_group_id = nil) ⇒ ExecutionGroup
Returns a new instance of ExecutionGroup.
28 29 30 31 32 33 34 35 36 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 28 def initialize(new_group_id=nil) @group_id = new_group_id @queue = [] @jobs_by_id = {} @log = nil @time_start = nil @time_end = nil @concurrency = 1 end |
Instance Attribute Details
#concurrency ⇒ Object
Returns the value of attribute concurrency.
25 26 27 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 25 def concurrency @concurrency end |
#description ⇒ Object
Returns the value of attribute description.
25 26 27 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 25 def description @description end |
#group_id ⇒ Object
Returns the value of attribute group_id.
25 26 27 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 25 def group_id @group_id end |
#time_end ⇒ Object (readonly)
Returns the value of attribute time_end.
26 27 28 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 26 def time_end @time_end end |
#time_start ⇒ Object (readonly)
Returns the value of attribute time_start.
26 27 28 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 26 def time_start @time_start end |
Instance Method Details
#cancel(id) ⇒ Object
Cancel a job by id
227 228 229 230 231 232 233 234 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 227 def cancel(id) Log.warn "Cancelling job id #{id}" job = @jobs_by_id[id] job.status = Executor::STATUS_ERROR job.owner = nil job.time_end = Time.now @queue.delete(job) end |
#done? ⇒ Boolean
An execution group is “done” if nothing is queued or running and at least one job has completed.
171 172 173 174 175 176 177 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 171 def done? return ( get_jobs_by_status(Executor::STATUS_NONE).size == 0 && get_jobs_by_status(Executor::STATUS_RUNNING).size == 0 && get_jobs_by_status(Executor::STATUS_DONE).size > 0 ) end |
#get_job(i) ⇒ Object
Get a particular job
130 131 132 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 130 def get_job(i) @jobs_by_id[i] end |
#get_job_ids ⇒ Object
Get all job ids
123 124 125 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 123 def get_job_ids @jobs_by_id.keys end |
#get_jobs ⇒ Object
Get all jobs
116 117 118 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 116 def get_jobs @jobs_by_id.values end |
#get_jobs_by_status(status) ⇒ Object
Get jobs by status
137 138 139 140 141 142 143 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 137 def get_jobs_by_status(status) r = [] @jobs_by_id.values.each do |i| r << i if i.status == status.to_sym || status.to_sym == :all end return r end |
#get_total_exec_time ⇒ Object
Return total execution time
239 240 241 242 243 244 245 246 247 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 239 def get_total_exec_time if @time_start == nil return 0 elsif @time_end == nil return Time.now.to_i - @time_start.to_i else return @time_end.to_i- @time_start.to_i end end |
#job_completed ⇒ Object
145 146 147 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 145 def job_completed @time_end = Time.now end |
#push(j) ⇒ Object
Add something to the work queue
41 42 43 44 45 46 47 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 41 def push(j) raise "invalid work" if j == nil j.job_id = IDManager.get if j.job_id == nil j.group = self @queue.push(j) @jobs_by_id[j.job_id] = j end |
#queue(id) ⇒ Object
Queue a held job by id
201 202 203 204 205 206 207 208 209 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 201 def queue(id) Log.debug "Queuing held job id #{id}" job = @jobs_by_id[id] job.owner = nil job.time_start = Time.now job.time_end = nil job.status = Executor::STATUS_NONE self.push(job) end |
#ready? ⇒ Boolean
An execution group is “ready” if it has work that can be done; see implementation in child classes.
163 164 165 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 163 def ready? raise "unimplemented" end |
#requeue(id) ⇒ Object
Requeue a job by id
214 215 216 217 218 219 220 221 222 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 214 def requeue(id) Log.debug "Requeuing job id #{id}" job = @jobs_by_id[id] job.status = Executor::STATUS_NONE job.owner = nil job.time_start = Time.now job.time_end = nil self.push(job) end |
#requeue_failed_jobs! ⇒ Object
Requeue all failed jobs
192 193 194 195 196 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 192 def requeue_failed_jobs! get_jobs_by_status(Executor::STATUS_ERROR).each do |job| requeue(job.job_id) end end |
#reset! ⇒ Object
Reset the queue
109 110 111 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 109 def reset! @queue = [] end |
#results ⇒ Object
Return a hash of the results
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 70 def results return self.get_jobs.map do |task| next if task == nil next if task.server == nil { :job_id => task.job_id, :name => task.info[0], :host => task.server.name, :status => task.status, :error => task.error, :total => self.get_total_execution_time(task.status, task.time_start, task.time_end), :start => task.time_start, :end => task.time_end, :worker => task } end end |
#running? ⇒ Boolean
Is this execution group running anything?
182 183 184 185 186 187 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 182 def running? total_jobs_running = get_jobs_by_status(Executor::STATUS_NONE).size + get_jobs_by_status(Executor::STATUS_RUNNING).size + get_jobs_by_status(Executor::STATUS_RETRYING).size (total_jobs_running > 0) end |
#set_jobs(jobs = []) ⇒ Object
Reset all jobs and bulk set them
152 153 154 155 156 157 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 152 def set_jobs(jobs=[]) self.reset! jobs.each do |job| self.push(job) end end |
#shift ⇒ Object
Take something from the queue
52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 52 def shift updated_queue = [] found_job = nil @queue.each do |job| if found_job || job.status == Executor::STATUS_HOLDING updated_queue.push(job) elsif job.status == Executor::STATUS_NONE found_job = job end end @queue = updated_queue @time_start = Time.now if @time_start == nil return found_job end |
#size ⇒ Object
Size of the active queue
91 92 93 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 91 def size return @queue.size end |
#sort! ⇒ Object
Sort queue by server nickname
98 99 100 101 102 103 104 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 98 def sort! if @queue != nil @queue.sort! do |a,b| a.server.nickname <=> b.server.nickname end end end |
#to_s ⇒ Object
Print out ExecutionGroup information
252 253 254 |
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 252 def to_s return "#{self.class}[#{group_id}]: ready=#{self.ready?} total_jobs=#{@jobs_by_id.size} queued_jobs=#{self.size}" end |