Class: Chimp::ChimpQueue

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/right_chimp/queue/chimp_queue.rb

Overview

The ChimpQueue is a singleton that contains the chimp work queue

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeChimpQueue

Returns a new instance of ChimpQueue.



11
12
13
14
15
16
17
18
19
20
# File 'lib/right_chimp/queue/chimp_queue.rb', line 11

def initialize
  @delay = 0
  @retry_count = 0
  @max_threads = 10
  @workers_never_exit = true
  @threads = []
  @semaphore = Mutex.new
  @processing = {}
  self.reset!
end

Instance Attribute Details

#delayObject

Returns the value of attribute delay.



9
10
11
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9

def delay
  @delay
end

#groupObject

Returns the value of attribute group.



9
10
11
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9

def group
  @group
end

#max_threadsObject

Returns the value of attribute max_threads.



9
10
11
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9

def max_threads
  @max_threads
end

#processingObject

Returns the value of attribute processing.



9
10
11
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9

def processing
  @processing
end

#retry_countObject

Returns the value of attribute retry_count.



9
10
11
# File 'lib/right_chimp/queue/chimp_queue.rb', line 9

def retry_count
  @retry_count
end

Class Method Details

.[](group) ⇒ Object

Allow the groups to be accessed as ChimpQueue.group



139
140
141
# File 'lib/right_chimp/queue/chimp_queue.rb', line 139

def self.[](group)
  return ChimpQueue.instance.group[group]
end

.[]=(k, v) ⇒ Object



143
144
145
# File 'lib/right_chimp/queue/chimp_queue.rb', line 143

def self.[]=(k,v)
  ChimpQueue.instance.group[k] = v
end

Instance Method Details

#create_group(name, type = :parallel, concurrency = 1) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/right_chimp/queue/chimp_queue.rb', line 58

def create_group(name, type = :parallel, concurrency = 1)
  Log.debug "Creating new execution group #{name} type=#{type} concurrency=#{concurrency}"
  new_group = ExecutionGroupFactory.from_type(type)
  new_group.group_id = name
  new_group.concurrency = concurrency
  ChimpQueue[name] = new_group
end

#get_job(id) ⇒ Object



163
164
165
166
167
168
169
# File 'lib/right_chimp/queue/chimp_queue.rb', line 163

def get_job(id)
  jobs = self.get_jobs

  jobs.each do |j|
    return j if j.job_id == id.to_i
  end
end

#get_jobsObject



171
172
173
174
175
176
177
178
# File 'lib/right_chimp/queue/chimp_queue.rb', line 171

def get_jobs
  r = []
  @group.values.each do |group|
    group.get_jobs.each { |job| r << job }
  end

  r
end

#get_jobs_by_status(status) ⇒ Object

Return an array of all jobs with the requested status.



151
152
153
154
155
156
157
158
159
160
161
# File 'lib/right_chimp/queue/chimp_queue.rb', line 151

def get_jobs_by_status(status)
  r = []
  @group.values.each do |group|
    v = group.get_jobs_by_status(status)
    if v != nil and v != []
      r += v
    end
  end

  return r
end

#get_jobs_by_uuid(uuid) ⇒ Object



180
181
182
183
184
185
186
187
188
189
# File 'lib/right_chimp/queue/chimp_queue.rb', line 180

def get_jobs_by_uuid(uuid)
  r     = []
  jobs = self.get_jobs

  jobs.each do |j|
    r << j if j.job_uuid == uuid
  end

  r
end

#push(g, w) ⇒ Object

Push a task into the queue



51
52
53
54
55
56
# File 'lib/right_chimp/queue/chimp_queue.rb', line 51

def push(g, w)
  raise "no group specified" unless g
  create_group(g) if not ChimpQueue[g]

  ChimpQueue[g].push(w) unless ChimpQueue[g].get_job(w.job_id)
end

#quitObject

Quit - empty the queue and wait for remaining jobs to complete



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/right_chimp/queue/chimp_queue.rb', line 98

def quit
  i = 0
  @group.keys.each do |group|
    wait_until_done(group) do
      if i < 30
        sleep 1
        i += 1
        print "."
      else
        break
      end
    end
  end

  @threads.each { |t| t.kill }
  puts " done."
end

#reset!Object

Reset the queue and the :default group

This doesn’t do anything to the groups’s jobs



27
28
29
30
# File 'lib/right_chimp/queue/chimp_queue.rb', line 27

def reset!
  @group = {}
  @group[:default] = ParallelExecutionGroup.new(:default)
end

#run_threadsObject

Run all threads forever (used by chimpd)



119
120
121
122
123
# File 'lib/right_chimp/queue/chimp_queue.rb', line 119

def run_threads
  @threads.each do |t|
    t.join(5)
  end
end

#shiftObject

Grab the oldest work item available



69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/right_chimp/queue/chimp_queue.rb', line 69

def shift
  r = nil
  @semaphore.synchronize do
    @group.values.each do |group|
      if group.ready?
        r = group.shift
        Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'" unless r.nil?
        break
      end
    end
  end
  return(r)
end

#sizeObject

return the total number of queued (non-executing) objects



128
129
130
131
132
133
134
# File 'lib/right_chimp/queue/chimp_queue.rb', line 128

def size
  s = 0
  @group.values.each do |group|
    s += group.size
  end
  return(s)
end

#startObject

Start up queue runners



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/right_chimp/queue/chimp_queue.rb', line 35

def start
  self.sort_queues!

  for i in (1..max_threads)
    @threads << Thread.new(i) do
      worker = QueueWorker.new
      worker.delay = @delay
      worker.retry_count = @retry_count
      worker.run
    end
  end
end

#wait_until_done(g, &block) ⇒ Object

Wait until a group is done



86
87
88
89
90
91
92
93
# File 'lib/right_chimp/queue/chimp_queue.rb', line 86

def wait_until_done(g, &block)
  while @group[g].running?
    @threads.each do |t|
      t.join(1)
      yield
    end
  end
end