Module: BlackStack::Pampa
- Defined in:
- lib/pampa.rb
Defined Under Namespace
Constant Summary collapse
- @@nodes =
arrays of workers, nodes, and jobs.
[]
- @@jobs =
[]
- @@logger =
BlackStack::DummyLogger.new(nil)
- @@dispatcher_function =
nil
- @@worker_function =
nil
Class Method Summary collapse
-
.add_job(h) ⇒ Object
add a job to the cluster.
-
.add_jobs(a) ⇒ Object
add an array of jobs to the cluster.
-
.add_node(h) ⇒ Object
add a node to the cluster.
-
.add_nodes(a) ⇒ Object
add an array of nodes to the cluster.
-
.dispatch ⇒ Object
iterate the workers.
- .dispatcher_function ⇒ Object
-
.jobs ⇒ Object
return the array of nodes.
-
.logger ⇒ Object
get and set logger.
-
.nodes ⇒ Object
return the array of nodes.
-
.relaunch(n = 10000) ⇒ Object
iterate the jobs.
- .set_logger(l) ⇒ Object
- .set_snippets(h) ⇒ Object
-
.stretch ⇒ Object
get attached and unassigned workers.
- .worker_function ⇒ Object
-
.workers ⇒ Object
return the array of all workers, beloning all nodes.
Class Method Details
.add_job(h) ⇒ Object
add a job to the cluster.
45 46 47 |
# File 'lib/pampa.rb', line 45 def self.add_job(h) @@jobs << BlackStack::Pampa::Job.new(h) end |
.add_jobs(a) ⇒ Object
add an array of jobs to the cluster.
50 51 52 53 54 55 56 57 58 |
# File 'lib/pampa.rb', line 50 def self.add_jobs(a) # validate: the parameter a is an array raise "The parameter a is not an array" unless a.is_a?(Array) # iterate over the array a.each do |h| # create the job self.add_job(h) end end |
.add_node(h) ⇒ Object
add a node to the cluster.
19 20 21 |
# File 'lib/pampa.rb', line 19 def self.add_node(h) @@nodes << BlackStack::Pampa::Node.new(h) end |
.add_nodes(a) ⇒ Object
add an array of nodes to the cluster.
24 25 26 27 28 29 30 31 32 |
# File 'lib/pampa.rb', line 24 def self.add_nodes(a) # validate: the parameter a is an array raise "The parameter a is not an array" unless a.is_a?(Array) # iterate over the array a.each do |h| # create the node self.add_node(h) end end |
.dispatch ⇒ Object
iterate the workers. for each worker, iterate the job.
Parameters:
-
config: relative path of the configuration file. Example: ‘../config.rb’
-
worker: relative path of the worker.rb file. Example: ‘../worker.rb’
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/pampa.rb', line 211 def self.dispatch() # getting logger l = self.logger() # iterate the workers BlackStack::Pampa.workers.each { |worker| l.logs("worker:#{worker.id} (job:#{worker.assigned_job.to_s})... ") if !worker.attached l.logf("detached".green) else if worker.assigned_job.nil? l.logf("unassigned".yellow) else # get the job this worker is assigned to job = BlackStack::Pampa.jobs.select { |j| j.name.to_s == worker.assigned_job.to_s }.first if job.nil? l.logf("job #{job.name} not found".red) else l.logf("done".green + " (#{job.run_dispatch(worker).to_s.blue})") end end end } # @@nodes.each do |node| end |
.dispatcher_function ⇒ Object
80 81 82 |
# File 'lib/pampa.rb', line 80 def self.dispatcher_function @@dispatcher_function end |
.jobs ⇒ Object
return the array of nodes.
61 62 63 |
# File 'lib/pampa.rb', line 61 def self.jobs() @@jobs end |
.logger ⇒ Object
get and set logger
66 67 68 |
# File 'lib/pampa.rb', line 66 def self.logger() @@logger end |
.nodes ⇒ Object
return the array of nodes.
35 36 37 |
# File 'lib/pampa.rb', line 35 def self.nodes() @@nodes end |
.relaunch(n = 10000) ⇒ Object
iterate the jobs. for each job, get all the tasks to relaunch. for each task to relaunch, relaunch it.
Parameters:
-
config: relative path of the configuration file. Example: ‘../config.rb’
-
worker: relative path of the worker.rb file. Example: ‘../worker.rb’
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/pampa.rb', line 184 def self.relaunch(n=10000) # getting logger l = self.logger() # iterate the workers BlackStack::Pampa.jobs.each { |job| l.logs("job:#{job.name}... ") l.logs("Gettting tasks to relaunch (max #{n})... ") tasks = job.relaunching(n) l.logf("done".green + " (#{tasks.size.to_s.blue})") tasks.each { |task| l.logs("Relaunching task #{task[job.field_primary_key.to_sym]}... ") job.relaunch(task) l.logf 'done'.green } l.logf 'done'.green } end |
.set_logger(l) ⇒ Object
70 71 72 |
# File 'lib/pampa.rb', line 70 def self.set_logger(l) @@logger = l end |
.set_snippets(h) ⇒ Object
75 76 77 78 |
# File 'lib/pampa.rb', line 75 def self.set_snippets(h) @@dispatcher_function = h[:dispatcher_function] if h.has_key?(:dispatcher_function) @@worker_function = h[:worker_function] if h.has_key?(:worker_function) end |
.stretch ⇒ Object
get attached and unassigned workers. assign and unassign workers to jobs.
Parameters:
-
config: relative path of the configuration file. Example: ‘../config.rb’
-
worker: relative path of the worker.rb file. Example: ‘../worker.rb’
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/pampa.rb', line 96 def self.stretch() # getting logger l = self.logger() # get the job this worker is working with BlackStack::Pampa.jobs.each { |job| l.logs "job #{job.name}... " # get attached and unassigned workers l.logs "Getting attached and unassigned workers... " workers = BlackStack::Pampa.workers.select { |w| w.attached && w.assigned_job.nil? } l.logf 'done'.green + " (#{workers.size.to_s.blue})" # get the workers that match the filter l.logs "Getting workers that match the filter... " workers = workers.select { |w| w.id =~ job.filter_worker_id } l.logf "done".green + " (#{workers.size.to_s.blue})" # if theere are workers if workers.size > 0 l.logs("Gettting assigned workers... ") assigned = BlackStack::Pampa.workers.select { |worker| worker.attached && worker.assigned_job.to_s == job.name.to_s } l.logf "done ".green + " (#{assigned.size.to_s.blue})" l.logs("Getting total pending (pending) tasks... ") pendings = job.pending l.logf "done".green + " (#{pendings.to_s.blue})" l.logs("0 pending tasks?.... ") if pendings.size == 0 l.logf "yes".green l.logs("Unassigning all assigned workers... ") assigned.each { |w| l.logs("Unassigning worker... ") w.assigned_job = nil workers << w # add worker back to the list of unassigned l.logf "done".green + " (#{w.id.to_s.blue})" } l.logf 'done'.green else l.logf "no".red l.logs("Under :max_pending_tasks (#{job.max_pending_tasks}) and more than 1 assigned workers ?... ") if pendings.size < job.max_pending_tasks && assigned.size > 1 l.logf "yes".green while assigned.size > 1 l.logs("Unassigning worker... ") w = assigned.pop # TODO: find a worker with no pending tasks w.assigned_job = nil workers << w # add worker back to the array of unassigned workers l.logf "done".green + " (#{w.id.to_s.blue})" end else l.logf "no".red l.logs("Over :max_assigned_workers (#{job.max_assigned_workers.to_s.blue}) and more than 1 assigned workers?... ") if assigned.size >= job.max_assigned_workers && assigned.size > 1 l.logf("yes".green) else l.logf("no".red) i = assigned.size while i < job.max_assigned_workers i += 1 l.logs("Assigning worker... ") w = workers.pop if w.nil? l.logf("no more workers".yellow) break else w.assigned_job = job.name.to_sym l.logf "done".green + " (#{w.id.to_s.blue})" end end # while i < job.max_assigned_workers end # if assigned.size >= job.max_assigned_workers && assigned.size > 0 end # if pendings.size < job.max_pending_tasks && assigned.size > 1 end # if pendings.size == 0 end # if workers.size > 0 l.logf 'done'.green } end |
.worker_function ⇒ Object
84 85 86 |
# File 'lib/pampa.rb', line 84 def self.worker_function @@worker_function end |
.workers ⇒ Object
return the array of all workers, beloning all nodes.
40 41 42 |
# File 'lib/pampa.rb', line 40 def self.workers() @@nodes.map { |node| node.workers }.flatten end |