Class: Chimp::ExecutionGroup

Inherits:
Object
  • Object
show all
Defined in:
lib/right_chimp/queue/ExecutionGroup.rb

Overview

An ExecutionGroup contains a set of Executors to be processed

Only the subclasses SerialExecutionGroup and ParallelExecutionGroup should be used directly.

Direct Known Subclasses

ParallelExecutionGroup, SerialExecutionGroup

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(new_group_id = nil) ⇒ ExecutionGroup

Returns a new instance of ExecutionGroup.



28
29
30
31
32
33
34
35
36
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 28

def initialize(new_group_id=nil)
  @group_id = new_group_id
  @queue = []
  @jobs_by_id = {}
  @log = nil
  @time_start = nil
  @time_end = nil
  @concurrency = 1
end

Instance Attribute Details

#concurrencyObject

Returns the value of attribute concurrency.



25
26
27
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 25

def concurrency
  @concurrency
end

#descriptionObject

Returns the value of attribute description.



25
26
27
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 25

def description
  @description
end

#group_idObject

Returns the value of attribute group_id.



25
26
27
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 25

def group_id
  @group_id
end

#time_endObject (readonly)

Returns the value of attribute time_end.



26
27
28
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 26

def time_end
  @time_end
end

#time_startObject (readonly)

Returns the value of attribute time_start.



26
27
28
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 26

def time_start
  @time_start
end

Instance Method Details

#cancel(id) ⇒ Object

Cancel a job by id



227
228
229
230
231
232
233
234
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 227

def cancel(id)
  Log.warn "Cancelling job id #{id}"
  job = @jobs_by_id[id]
  job.status = Executor::STATUS_ERROR
  job.owner = nil
  job.time_end = Time.now
  @queue.delete(job)
end

#done?Boolean

An execution group is “done” if nothing is queued or running and at least one job has completed.

Returns:

  • (Boolean)


172
173
174
175
176
177
178
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 172

def done?
  return (
    get_jobs_by_status(Executor::STATUS_NONE).size == 0 &&
    get_jobs_by_status(Executor::STATUS_RUNNING).size == 0 &&
     get_jobs_by_status(Executor::STATUS_DONE).size > 0
    )
end

#get_job(i) ⇒ Object

Get a particular job



131
132
133
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 131

def get_job(i)
  @jobs_by_id[i]
end

#get_job_idsObject

Get all job ids



124
125
126
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 124

def get_job_ids
  @jobs_by_id.keys
end

#get_jobsObject

Get all jobs



117
118
119
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 117

def get_jobs
  @jobs_by_id.values
end

#get_jobs_by_status(status) ⇒ Object

Get jobs by status



138
139
140
141
142
143
144
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 138

def get_jobs_by_status(status)
  r = []
  @jobs_by_id.values.each do |i|
    r << i if i.status == status.to_sym || status.to_sym == :all
  end
  return r
end

#get_total_exec_timeObject

Return total execution time



239
240
241
242
243
244
245
246
247
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 239

def get_total_exec_time
  if @time_start == nil
    return 0
  elsif @time_end == nil
    return Time.now.to_i - @time_start.to_i
  else
    return @time_end.to_i- @time_start.to_i
  end
end

#job_completedObject



146
147
148
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 146

def job_completed
  @time_end = Time.now
end

#push(j) ⇒ Object

Add something to the work queue



41
42
43
44
45
46
47
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 41

def push(j)
  raise "invalid work" if j == nil
  j.job_id = IDManager.get if j.job_id == nil
  j.group = self
  @queue.push(j)
  @jobs_by_id[j.job_id] = j
end

#queue(id) ⇒ Object

Queue a held job by id



202
203
204
205
206
207
208
209
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 202

def queue(id)
  Log.debug "Requeuing job id #{id}"
  job = @jobs_by_id[id]
  job.owner = nil
  job.time_start = Time.now
  job.time_end = nil
  job.status = Executor::STATUS_NONE
end

#ready?Boolean

An execution group is “ready” if it has work that can be done; see implementation in child classes.

Returns:

  • (Boolean)


164
165
166
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 164

def ready?
  raise "unimplemented"
end

#requeue(id) ⇒ Object

Requeue a job by id



214
215
216
217
218
219
220
221
222
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 214

def requeue(id)
  Log.debug "Requeuing job id #{id}"
  job = @jobs_by_id[id]
  job.status = Executor::STATUS_NONE
  job.owner = nil
  job.time_start = Time.now
  job.time_end = nil
  self.push(job)
end

#requeue_failed_jobs!Object

Requeue all failed jobs



193
194
195
196
197
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 193

def requeue_failed_jobs!
  get_jobs_by_status(Executor::STATUS_ERROR).each do |job|
    requeue(job.job_id)
  end
end

#reset!Object

Reset the queue



110
111
112
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 110

def reset!
  @queue = []
end

#resultsObject

Return a hash of the results



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 70

def results
  return self.get_jobs.map do |task|
    next if task == nil
    next if task.server == nil

    {
      :job_id => task.job_id,
      :name   => task.info,
      :host   => task.server['nickname'] || task.server['name'],
      :status => task.status,
      :error  => task.error,
      :total  => self.get_total_execution_time(task.status, task.time_start, task.time_end),
      :start  => task.time_start,
      :end    => task.time_end,
      :worker => task
    }
  end
end

#running?Boolean

Is this execution group running anything?

Returns:

  • (Boolean)


183
184
185
186
187
188
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 183

def running?
  total_jobs_running = get_jobs_by_status(Executor::STATUS_NONE).size +
      get_jobs_by_status(Executor::STATUS_RUNNING).size +
      get_jobs_by_status(Executor::STATUS_RETRYING).size
  return(total_jobs_running > 0)
end

#set_jobs(jobs = []) ⇒ Object

Reset all jobs and bulk set them



153
154
155
156
157
158
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 153

def set_jobs(jobs=[])
  self.reset!
  jobs.each do |job|
    self.push(job)
  end
end

#shiftObject

Take something from the queue



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 52

def shift
  updated_queue = []
  found_job = nil
  @queue.each do |job|
    if found_job || job.status == Executor::STATUS_HOLDING
      updated_queue.push(job)
    else
      found_job = job
    end
  end
  @queue = updated_queue
  @time_start = Time.now if @time_start == nil
  return found_job
end

#sizeObject

Size of the active queue



92
93
94
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 92

def size
  return @queue.size
end

#sort!Object

Sort queue by server nickname



99
100
101
102
103
104
105
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 99

def sort!
  if @queue != nil
    @queue.sort! do |a,b|
      a.server['nickname'] <=> b.server['nickname']
    end
  end
end

#to_sObject

Print out ExecutionGroup information



252
253
254
# File 'lib/right_chimp/queue/ExecutionGroup.rb', line 252

def to_s
  return "#{self.class}[#{group_id}]: ready=#{self.ready?} total_jobs=#{@jobs_by_id.size} queued_jobs=#{self.size}"
end