Class: Chimp::ChimpQueue
- Inherits:
-
Object
- Object
- Chimp::ChimpQueue
- Includes:
- Singleton
- Defined in:
- lib/right_chimp/queue/ChimpQueue.rb
Overview
The ChimpQueue is a singleton that contains the chimp work queue
Instance Attribute Summary collapse
-
#delay ⇒ Object
Returns the value of attribute delay.
-
#group ⇒ Object
Returns the value of attribute group.
-
#max_threads ⇒ Object
Returns the value of attribute max_threads.
-
#retry_count ⇒ Object
Returns the value of attribute retry_count.
Class Method Summary collapse
-
.[](group) ⇒ Object
Allow the groups to be accessed as ChimpQueue.group.
- .[]=(k, v) ⇒ Object
Instance Method Summary collapse
- #create_group(name, type = :parallel, concurrency = 1) ⇒ Object
- #get_job(id) ⇒ Object
- #get_jobs ⇒ Object
-
#get_jobs_by_status(status) ⇒ Object
Return an array of all jobs with the requested status.
-
#initialize ⇒ ChimpQueue
constructor
A new instance of ChimpQueue.
-
#push(g, w) ⇒ Object
Push a task into the queue.
-
#quit ⇒ Object
Quit - empty the queue and wait for remaining jobs to complete.
-
#reset! ⇒ Object
Reset the queue and the :default group.
-
#run_threads ⇒ Object
Run all threads forever (used by chimpd).
-
#shift ⇒ Object
Grab the oldest work item available.
-
#size ⇒ Object
return the total number of queued (non-executing) objects.
-
#start ⇒ Object
Start up queue runners.
-
#wait_until_done(g, &block) ⇒ Object
Wait until a group is done.
Constructor Details
#initialize ⇒ ChimpQueue
Returns a new instance of ChimpQueue.
11 12 13 14 15 16 17 18 19 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 11 def initialize @delay = 0 @retry_count = 0 @max_threads = 10 @workers_never_exit = true @threads = [] @semaphore = Mutex.new self.reset! end |
Instance Attribute Details
#delay ⇒ Object
Returns the value of attribute delay.
9 10 11 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 9 def delay @delay end |
#group ⇒ Object
Returns the value of attribute group.
9 10 11 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 9 def group @group end |
#max_threads ⇒ Object
Returns the value of attribute max_threads.
9 10 11 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 9 def max_threads @max_threads end |
#retry_count ⇒ Object
Returns the value of attribute retry_count.
9 10 11 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 9 def retry_count @retry_count end |
Class Method Details
.[](group) ⇒ Object
Allow the groups to be accessed as ChimpQueue.group
137 138 139 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 137 def self.[](group) return ChimpQueue.instance.group[group] end |
.[]=(k, v) ⇒ Object
141 142 143 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 141 def self.[]=(k,v) ChimpQueue.instance.group[k] = v end |
Instance Method Details
#create_group(name, type = :parallel, concurrency = 1) ⇒ Object
56 57 58 59 60 61 62 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 56 def create_group(name, type = :parallel, concurrency = 1) Log.debug "Creating new execution group #{name} type=#{type} concurrency=#{concurrency}" new_group = ExecutionGroupFactory.from_type(type) new_group.group_id = name new_group.concurrency = concurrency ChimpQueue[name] = new_group end |
#get_job(id) ⇒ Object
161 162 163 164 165 166 167 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 161 def get_job(id) jobs = self.get_jobs jobs.each do |j| return j if j.job_id == id end end |
#get_jobs ⇒ Object
169 170 171 172 173 174 175 176 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 169 def get_jobs r = [] @group.values.each do |group| group.get_jobs.each { |job| r << job } end return r end |
#get_jobs_by_status(status) ⇒ Object
Return an array of all jobs with the requested status.
149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 149 def get_jobs_by_status(status) r = [] @group.values.each do |group| v = group.get_jobs_by_status(status) if v != nil and v != [] r += v end end return r end |
#push(g, w) ⇒ Object
Push a task into the queue
50 51 52 53 54 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 50 def push(g, w) raise "no group specified" unless g create_group(g) if not ChimpQueue[g] ChimpQueue[g].push(w) unless ChimpQueue[g].get_job(w.job_id) end |
#quit ⇒ Object
Quit - empty the queue and wait for remaining jobs to complete
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 96 def quit i = 0 @group.keys.each do |group| wait_until_done(group) do if i < 30 sleep 1 i += 1 print "." else break end end end @threads.each { |t| t.kill } puts " done." end |
#reset! ⇒ Object
Reset the queue and the :default group
This doesn’t do anything to the groups’s jobs
26 27 28 29 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 26 def reset! @group = {} @group[:default] = ParallelExecutionGroup.new(:default) end |
#run_threads ⇒ Object
Run all threads forever (used by chimpd)
117 118 119 120 121 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 117 def run_threads @threads.each do |t| t.join(5) end end |
#shift ⇒ Object
Grab the oldest work item available
67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 67 def shift r = nil @semaphore.synchronize do @group.values.each do |group| if group.ready? r = group.shift Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'" break end end end return(r) end |
#size ⇒ Object
return the total number of queued (non-executing) objects
126 127 128 129 130 131 132 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 126 def size s = 0 @group.values.each do |group| s += group.size end return(s) end |
#start ⇒ Object
Start up queue runners
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 34 def start self.sort_queues! for i in (1..max_threads) @threads << Thread.new(i) do worker = QueueWorker.new worker.delay = @delay worker.retry_count = @retry_count worker.run end end end |
#wait_until_done(g, &block) ⇒ Object
Wait until a group is done
84 85 86 87 88 89 90 91 |
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 84 def wait_until_done(g, &block) while @group[g].running? @threads.each do |t| t.join(1) yield end end end |