Module: BlackStack::Pampa

Defined in:
lib/pampa.rb

Defined Under Namespace

Classes: Job, Node, Worker

Constant Summary collapse

@@nodes =

arrays of workers, nodes, and jobs.

[]
@@jobs =
[]
@@logger =
BlackStack::DummyLogger.new(nil)
@@dispatcher_function =
nil
@@worker_function =
nil

Class Method Summary collapse

Class Method Details

.add_job(h) ⇒ Object

add a job to the cluster.



45
46
47
# File 'lib/pampa.rb', line 45

def self.add_job(h)
    @@jobs << BlackStack::Pampa::Job.new(h)
end

.add_jobs(a) ⇒ Object

add an array of jobs to the cluster.



50
51
52
53
54
55
56
57
58
# File 'lib/pampa.rb', line 50

def self.add_jobs(a)
    # validate: the parameter a is an array
    raise "The parameter a is not an array" unless a.is_a?(Array)
    # iterate over the array
    a.each do |h|
        # create the job
        self.add_job(h)
    end
end

.add_node(h) ⇒ Object

add a node to the cluster.



19
20
21
# File 'lib/pampa.rb', line 19

def self.add_node(h)
    @@nodes << BlackStack::Pampa::Node.new(h)
end

.add_nodes(a) ⇒ Object

add an array of nodes to the cluster.



24
25
26
27
28
29
30
31
32
# File 'lib/pampa.rb', line 24

def self.add_nodes(a)
    # validate: the parameter a is an array
    raise "The parameter a is not an array" unless a.is_a?(Array)
    # iterate over the array
    a.each do |h|
        # create the node
        self.add_node(h)
    end
end

.dispatchObject

iterate the workers. for each worker, iterate the job.

Parameters:

  • config: relative path of the configuration file. Example: ‘../config.rb’

  • worker: relative path of the worker.rb file. Example: ‘../worker.rb’



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/pampa.rb', line 211

def self.dispatch()
    # getting logger
    l = self.logger()
    # iterate the workers
    BlackStack::Pampa.workers.each { |worker|
        l.logs("worker:#{worker.id} (job:#{worker.assigned_job.to_s})... ")
        if !worker.attached
          l.logf("detached".green)
        else
          if worker.assigned_job.nil?
            l.logf("unassigned".yellow)
          else
            # get the job this worker is assigned to
            job = BlackStack::Pampa.jobs.select { |j| j.name.to_s == worker.assigned_job.to_s }.first
            if job.nil?
              l.logf("job #{job.name} not found".red)
            else
              l.logf("done".green + " (#{job.run_dispatch(worker).to_s.blue})")
            end
          end
        end
    } # @@nodes.each do |node|            
end

.dispatcher_functionObject



80
81
82
# File 'lib/pampa.rb', line 80

def self.dispatcher_function
  @@dispatcher_function
end

.jobsObject

return the array of nodes.



61
62
63
# File 'lib/pampa.rb', line 61

def self.jobs()
  @@jobs
end

.loggerObject

get and set logger



66
67
68
# File 'lib/pampa.rb', line 66

def self.logger()
  @@logger
end

.nodesObject

return the array of nodes.



35
36
37
# File 'lib/pampa.rb', line 35

def self.nodes()
    @@nodes
end

.relaunch(n = 10000) ⇒ Object

iterate the jobs. for each job, get all the tasks to relaunch. for each task to relaunch, relaunch it.

Parameters:

  • config: relative path of the configuration file. Example: ‘../config.rb’

  • worker: relative path of the worker.rb file. Example: ‘../worker.rb’



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/pampa.rb', line 184

def self.relaunch(n=10000)
  # getting logger
  l = self.logger()
  # iterate the workers
  BlackStack::Pampa.jobs.each { |job|
    l.logs("job:#{job.name}... ")
      l.logs("Gettting tasks to relaunch (max #{n})... ")
      tasks = job.relaunching(n)
      l.logf("done".green + " (#{tasks.size.to_s.blue})")

      tasks.each { |task| 
        l.logs("Relaunching task #{task[job.field_primary_key.to_sym]}... ")
        job.relaunch(task)
        l.logf 'done'.green
      }

    l.logf 'done'.green
  }
end

.set_logger(l) ⇒ Object



70
71
72
# File 'lib/pampa.rb', line 70

def self.set_logger(l)
  @@logger = l
end

.set_snippets(h) ⇒ Object



75
76
77
78
# File 'lib/pampa.rb', line 75

def self.set_snippets(h)
  @@dispatcher_function = h[:dispatcher_function] if h.has_key?(:dispatcher_function)
  @@worker_function = h[:worker_function] if h.has_key?(:worker_function)
end

.stretchObject

get attached and unassigned workers. assign and unassign workers to jobs.

Parameters:

  • config: relative path of the configuration file. Example: ‘../config.rb’

  • worker: relative path of the worker.rb file. Example: ‘../worker.rb’



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/pampa.rb', line 96

def self.stretch()
  # getting logger
  l = self.logger()
  # get the job this worker is working with
  BlackStack::Pampa.jobs.each { |job|
    l.logs "job #{job.name}... "
      # get attached and unassigned workers 
      l.logs "Getting attached and unassigned workers... "
      workers = BlackStack::Pampa.workers.select { |w| w.attached && w.assigned_job.nil? }
      l.logf 'done'.green + " (#{workers.size.to_s.blue})"
      # get the workers that match the filter
      l.logs "Getting workers that match the filter... "
      workers = workers.select { |w| w.id =~ job.filter_worker_id }
      l.logf "done".green + " (#{workers.size.to_s.blue})"
      # if theere are workers
      if workers.size > 0
        l.logs("Gettting assigned workers... ") 
        assigned = BlackStack::Pampa.workers.select { |worker| worker.attached && worker.assigned_job.to_s == job.name.to_s }
        l.logf "done ".green + " (#{assigned.size.to_s.blue})"

        l.logs("Getting total pending (pending) tasks... ")
        pendings = job.pending
        l.logf "done".green + " (#{pendings.to_s.blue})"

        l.logs("0 pending tasks?.... ")
        if pendings.size == 0
          l.logf "yes".green

          l.logs("Unassigning all assigned workers... ")
          assigned.each { |w|
            l.logs("Unassigning worker... ")
            w.assigned_job = nil
            workers << w # add worker back to the list of unassigned
            l.logf "done".green + " (#{w.id.to_s.blue})"
          }
          l.logf 'done'.green
        else
          l.logf "no".red

          l.logs("Under :max_pending_tasks (#{job.max_pending_tasks}) and more than 1 assigned workers ?... ")
          if pendings.size < job.max_pending_tasks && assigned.size > 1
            l.logf "yes".green

            while assigned.size > 1
              l.logs("Unassigning worker... ")
              w = assigned.pop # TODO: find a worker with no pending tasks
              w.assigned_job = nil
              workers << w # add worker back to the array of unassigned workers
              l.logf "done".green + " (#{w.id.to_s.blue})"
            end
          else
            l.logf "no".red

            l.logs("Over :max_assigned_workers (#{job.max_assigned_workers.to_s.blue}) and more than 1 assigned workers?... ")
            if assigned.size >= job.max_assigned_workers && assigned.size > 1
              l.logf("yes".green)
            else
              l.logf("no".red)

              i = assigned.size
              while i < job.max_assigned_workers
                i += 1
                l.logs("Assigning worker... ")
                w = workers.pop
                if w.nil?
                  l.logf("no more workers".yellow)
                  break
                else
                  w.assigned_job = job.name.to_sym
                  l.logf "done".green + " (#{w.id.to_s.blue})"
                end
              end # while i < job.max_assigned_workers
            end # if assigned.size >= job.max_assigned_workers && assigned.size > 0
          end # if pendings.size < job.max_pending_tasks && assigned.size > 1
        end # if pendings.size == 0
      end # if workers.size > 0
    l.logf 'done'.green
  }
end

.worker_functionObject



84
85
86
# File 'lib/pampa.rb', line 84

def self.worker_function
  @@worker_function
end

.workersObject

return the array of all workers, beloning all nodes.



40
41
42
# File 'lib/pampa.rb', line 40

def self.workers()
  @@nodes.map { |node| node.workers }.flatten
end