Class: Chimp::ChimpQueue

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/right_chimp/queue/ChimpQueue.rb

Overview

The ChimpQueue is a singleton that contains the chimp work queue

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeChimpQueue

Returns a new instance of ChimpQueue.



11
12
13
14
15
16
17
18
19
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 11

def initialize
  @delay = 0
  @retry_count = 0
  @max_threads = 10
  @workers_never_exit = true
  @threads = []
  @semaphore = Mutex.new
  self.reset!
end

Instance Attribute Details

#delayObject

Returns the value of attribute delay.



9
10
11
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 9

def delay
  @delay
end

#groupObject

Returns the value of attribute group.



9
10
11
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 9

def group
  @group
end

#max_threadsObject

Returns the value of attribute max_threads.



9
10
11
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 9

def max_threads
  @max_threads
end

#retry_countObject

Returns the value of attribute retry_count.



9
10
11
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 9

def retry_count
  @retry_count
end

Class Method Details

.[](group) ⇒ Object

Allow the groups to be accessed as ChimpQueue.group



137
138
139
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 137

def self.[](group)
  return ChimpQueue.instance.group[group]
end

.[]=(k, v) ⇒ Object



141
142
143
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 141

def self.[]=(k,v)
  ChimpQueue.instance.group[k] = v
end

Instance Method Details

#create_group(name, type = :parallel, concurrency = 1) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 56

def create_group(name, type = :parallel, concurrency = 1)
  Log.debug "Creating new execution group #{name} type=#{type} concurrency=#{concurrency}"
  new_group = ExecutionGroupFactory.from_type(type)
  new_group.group_id = name
  new_group.concurrency = concurrency
  ChimpQueue[name] = new_group
end

#get_job(id) ⇒ Object



161
162
163
164
165
166
167
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 161

def get_job(id)
  jobs = self.get_jobs
  
  jobs.each do |j|
    return j if j.job_id == id
  end
end

#get_jobsObject



169
170
171
172
173
174
175
176
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 169

def get_jobs
  r = []
  @group.values.each do |group|
    group.get_jobs.each { |job| r << job }
  end
  
  return r
end

#get_jobs_by_status(status) ⇒ Object

Return an array of all jobs with the requested status.



149
150
151
152
153
154
155
156
157
158
159
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 149

def get_jobs_by_status(status)
  r = []
  @group.values.each do |group| 
    v = group.get_jobs_by_status(status)
    if v != nil and v != []
      r += v
    end
  end
  
  return r
end

#push(g, w) ⇒ Object

Push a task into the queue



50
51
52
53
54
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 50

def push(g, w)
  raise "no group specified" unless g
  create_group(g) if not ChimpQueue[g]
  ChimpQueue[g].push(w) unless ChimpQueue[g].get_job(w.job_id)
end

#quitObject

Quit - empty the queue and wait for remaining jobs to complete



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 96

def quit
  i = 0
  @group.keys.each do |group|
    wait_until_done(group) do
      if i < 30
        sleep 1
        i += 1
        print "."
      else
        break
      end
    end
  end
  
  @threads.each { |t| t.kill }
  puts " done."
end

#reset!Object

Reset the queue and the :default group

This doesn’t do anything to the groups’s jobs



26
27
28
29
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 26

def reset!
  @group = {}
  @group[:default] = ParallelExecutionGroup.new(:default)
end

#run_threadsObject

Run all threads forever (used by chimpd)



117
118
119
120
121
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 117

def run_threads
  @threads.each do |t| 
    t.join(5)
  end
end

#shiftObject

Grab the oldest work item available



67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 67

def shift
  r = nil
  @semaphore.synchronize do
    @group.values.each do |group|
      if group.ready?
        r = group.shift
        Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'"
        break
      end
    end      
  end
  return(r)
end

#sizeObject

return the total number of queued (non-executing) objects



126
127
128
129
130
131
132
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 126

def size
  s = 0
  @group.values.each do |group|
    s += group.size
  end
  return(s)
end

#startObject

Start up queue runners



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 34

def start
  self.sort_queues!
  
  for i in (1..max_threads)
    @threads << Thread.new(i) do
      worker = QueueWorker.new
      worker.delay = @delay
      worker.retry_count = @retry_count
      worker.run
    end
  end
end

#wait_until_done(g, &block) ⇒ Object

Wait until a group is done



84
85
86
87
88
89
90
91
# File 'lib/right_chimp/queue/ChimpQueue.rb', line 84

def wait_until_done(g, &block)
  while @group[g].running? 
    @threads.each do |t|
      t.join(1)
      yield          
    end
  end
end