Class: BlackStack::Dispatcher
- Inherits:
-
Object
- Object
- BlackStack::Dispatcher
- Defined in:
- lib/pampa_dispatcher.rb
Instance Attribute Summary collapse
-
#allowing_function ⇒ Object
additional function to decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client it should returns true or false keep it nil if you want it returns always true.
-
#field_end_time ⇒ Object
Returns the value of attribute field_end_time.
-
#field_id ⇒ Object
Returns the value of attribute field_id.
-
#field_primary_key ⇒ Object
Returns the value of attribute field_primary_key.
-
#field_start_time ⇒ Object
Returns the value of attribute field_start_time.
-
#field_time ⇒ Object
Returns the value of attribute field_time.
-
#field_times ⇒ Object
Returns the value of attribute field_times.
-
#finisher_function ⇒ Object
additional function to perform the update on a record to flag the finishing of the job by default this function will set the :field_end_time field with the current datetime keep this parameter nil if you want to use the default algorithm.
-
#max_job_duration_minutes ⇒ Object
max number of minutes that a job should take to process.
-
#max_try_times ⇒ Object
max number of times that a record can start to process & fail (:start_time field is not nil, but :end_time field is still nil after :max_job_duration_minutes).
-
#name ⇒ Object
Returns the value of attribute name.
-
#occupied_function ⇒ Object
additional function to returns an array of objects pending to be processed by a worker.
-
#queue_size ⇒ Object
max number of records assigned to a worker that have not started (:start_time field is nil).
-
#relauncher_function ⇒ Object
additional function to perform the update on a record to retry keep this parameter nil if you want to use the default algorithm.
-
#relaunching_function ⇒ Object
additional function to choose the records to retry keep this parameter nil if you want to use the default algorithm.
-
#selecting_function ⇒ Object
additional function to choose the records to launch it should returns an array of IDs keep this parameter nil if you want to use the default algorithm.
-
#starter_function ⇒ Object
additional function to perform the update on a record to flag the starting of the job by default this function will set the :field_start_time field with the current datetime, and it will increase the :field_times counter keep this parameter nil if you want to use the default algorithm.
-
#table ⇒ Object
database information :field_times, :field_start_time and :field_end_time maybe nil.
Instance Method Summary collapse
-
#allowing(worker) ⇒ Object
decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client returns always true.
-
#available_slots(worker) ⇒ Object
returns the number of free slots in the procesing queue of this worker.
- #finish(o) ⇒ Object
-
#initialize(h) ⇒ Dispatcher
constructor
setup dispatcher configuration here.
-
#occupied_slots(worker) ⇒ Object
returns an array of objects pending to be processed by the worker.
- #relaunch(o) ⇒ Object
- #relaunching(worker, n) ⇒ Object
-
#relaunching_dataset(worker, n) ⇒ Object
choose the records to retry returns an array of IDs.
-
#run_dispatch(worker) ⇒ Object
dispatch records returns the # of records dispatched.
-
#run_relaunch(worker) ⇒ Object
relaunch records.
-
#selecting(worker, n) ⇒ Object
selecting_dataset.
-
#selecting_dataset(worker, n) ⇒ Object
choose the records to dispatch returns an array of IDs.
- #start(o) ⇒ Object
Constructor Details
#initialize(h) ⇒ Dispatcher
setup dispatcher configuration here
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pampa_dispatcher.rb', line 54 def initialize(h) self.name = h[:name] self.table = h[:table] self.field_primary_key = h[:field_primary_key] self.field_id = h[:field_id] self.field_time = h[:field_time] self.field_times = h[:field_times] self.field_start_time = h[:field_start_time] self.field_end_time = h[:field_end_time] self.queue_size = h[:queue_size] self.max_job_duration_minutes = h[:max_job_duration_minutes] self.max_try_times = h[:max_try_times] self.occupied_function = h[:occupied_function] self.allowing_function = h[:allowing_function] self.selecting_function = h[:selecting_function] self.relaunching_function = h[:relaunching_function] self.relauncher_function = h[:relauncher_function] end |
Instance Attribute Details
#allowing_function ⇒ Object
additional function to decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client it should returns true or false keep it nil if you want it returns always true
32 33 34 |
# File 'lib/pampa_dispatcher.rb', line 32 def allowing_function @allowing_function end |
#field_end_time ⇒ Object
Returns the value of attribute field_end_time.
15 16 17 |
# File 'lib/pampa_dispatcher.rb', line 15 def field_end_time @field_end_time end |
#field_id ⇒ Object
Returns the value of attribute field_id.
11 12 13 |
# File 'lib/pampa_dispatcher.rb', line 11 def field_id @field_id end |
#field_primary_key ⇒ Object
Returns the value of attribute field_primary_key.
10 11 12 |
# File 'lib/pampa_dispatcher.rb', line 10 def field_primary_key @field_primary_key end |
#field_start_time ⇒ Object
Returns the value of attribute field_start_time.
14 15 16 |
# File 'lib/pampa_dispatcher.rb', line 14 def field_start_time @field_start_time end |
#field_time ⇒ Object
Returns the value of attribute field_time.
12 13 14 |
# File 'lib/pampa_dispatcher.rb', line 12 def field_time @field_time end |
#field_times ⇒ Object
Returns the value of attribute field_times.
13 14 15 |
# File 'lib/pampa_dispatcher.rb', line 13 def field_times @field_times end |
#finisher_function ⇒ Object
additional function to perform the update on a record to flag the finishing of the job by default this function will set the :field_end_time field with the current datetime keep this parameter nil if you want to use the default algorithm
50 51 52 |
# File 'lib/pampa_dispatcher.rb', line 50 def finisher_function @finisher_function end |
#max_job_duration_minutes ⇒ Object
max number of minutes that a job should take to process. if :end_time keep nil x minutes after :start_time, that’s considered as the job has failed or interrumped
20 21 22 |
# File 'lib/pampa_dispatcher.rb', line 20 def max_job_duration_minutes @max_job_duration_minutes end |
#max_try_times ⇒ Object
max number of times that a record can start to process & fail (:start_time field is not nil, but :end_time field is still nil after :max_job_duration_minutes)
23 24 25 |
# File 'lib/pampa_dispatcher.rb', line 23 def max_try_times @max_try_times end |
#name ⇒ Object
Returns the value of attribute name.
6 7 8 |
# File 'lib/pampa_dispatcher.rb', line 6 def name @name end |
#occupied_function ⇒ Object
additional function to returns an array of objects pending to be processed by a worker. it should returns an array keep it nil if you want to run the default function
27 28 29 |
# File 'lib/pampa_dispatcher.rb', line 27 def occupied_function @occupied_function end |
#queue_size ⇒ Object
max number of records assigned to a worker that have not started (:start_time field is nil)
17 18 19 |
# File 'lib/pampa_dispatcher.rb', line 17 def queue_size @queue_size end |
#relauncher_function ⇒ Object
additional function to perform the update on a record to retry keep this parameter nil if you want to use the default algorithm
42 43 44 |
# File 'lib/pampa_dispatcher.rb', line 42 def relauncher_function @relauncher_function end |
#relaunching_function ⇒ Object
additional function to choose the records to retry keep this parameter nil if you want to use the default algorithm
39 40 41 |
# File 'lib/pampa_dispatcher.rb', line 39 def relaunching_function @relaunching_function end |
#selecting_function ⇒ Object
additional function to choose the records to launch it should returns an array of IDs keep this parameter nil if you want to use the default algorithm
36 37 38 |
# File 'lib/pampa_dispatcher.rb', line 36 def selecting_function @selecting_function end |
#starter_function ⇒ Object
additional function to perform the update on a record to flag the starting of the job by default this function will set the :field_start_time field with the current datetime, and it will increase the :field_times counter keep this parameter nil if you want to use the default algorithm
46 47 48 |
# File 'lib/pampa_dispatcher.rb', line 46 def starter_function @starter_function end |
#table ⇒ Object
database information :field_times, :field_start_time and :field_end_time maybe nil
9 10 11 |
# File 'lib/pampa_dispatcher.rb', line 9 def table @table end |
Instance Method Details
#allowing(worker) ⇒ Object
decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client returns always true
99 100 101 102 103 104 105 106 |
# File 'lib/pampa_dispatcher.rb', line 99 def allowing(worker) if self.allowing_function.nil? return true else # TODO: validar que retorna true o false return self.allowing_function.call(worker, self) end end |
#available_slots(worker) ⇒ Object
returns the number of free slots in the procesing queue of this worker
86 87 88 89 90 91 92 93 94 |
# File 'lib/pampa_dispatcher.rb', line 86 def available_slots(worker) occupied = self.occupied_slots(worker).size allowed = self.queue_size if occupied > allowed return 0 else return allowed - occupied end end |
#finish(o) ⇒ Object
162 163 164 165 166 167 168 169 |
# File 'lib/pampa_dispatcher.rb', line 162 def finish(o) if self.finisher_function.nil? o[self.field_end_time.to_sym] = now() if !self.field_end_time.nil? o.save else self.finisher_function.call(o, self) end end |
#occupied_slots(worker) ⇒ Object
returns an array of objects pending to be processed by the worker. it will select the records with :reservation_id == worker.id, and :start_time == nil
75 76 77 78 79 80 81 82 83 |
# File 'lib/pampa_dispatcher.rb', line 75 def occupied_slots(worker) if self.occupied_function.nil? return self.table.where(self.field_id.to_sym => worker.id, self.field_start_time.to_sym => nil).all if !self.field_start_time.nil? return self.table.where(self.field_id.to_sym => worker.id).all if self.field_start_time.nil? else # TODO: validar que retorna un entero return self.occupied_function.call(worker, self) end end |
#relaunch(o) ⇒ Object
144 145 146 147 148 149 150 |
# File 'lib/pampa_dispatcher.rb', line 144 def relaunch(o) o[self.field_id.to_sym] = nil o[self.field_time.to_sym] = nil o[self.field_start_time.to_sym] = nil if !self.field_start_time.nil? o[self.field_end_time.to_sym] = nil if !self.field_end_time.nil? o.save end |
#relaunching(worker, n) ⇒ Object
135 136 137 138 139 140 141 142 |
# File 'lib/pampa_dispatcher.rb', line 135 def relaunching(worker, n) if self.relaunching_function.nil? return self.relaunching_dataset(worker, n).map { |o| o[self.field_primary_key.to_sym] } else # TODO: validar que retorna un array de strings return self.relaunching_function.call(worker, self, n) end end |
#relaunching_dataset(worker, n) ⇒ Object
choose the records to retry returns an array of IDs
128 129 130 131 132 133 |
# File 'lib/pampa_dispatcher.rb', line 128 def relaunching_dataset(worker, n) ds = self.table.select(self.field_primary_key.to_sym).where("#{self.field_time.to_s} < '#{(Time.now - 60*self.max_job_duration_minutes.to_i).strftime('%Y-%m-%d %H:%M:%S').to_s}'") ds = ds.filter("#{self.field_end_time.to_s} IS NULL") if !self.field_end_time.nil? # ds = ds.filter("( #{self.field_times.to_s} IS NULL OR #{self.field_times.to_s} < #{self.max_try_times.to_s} ) ") if !self.field_times.nil? ds = ds.limit(n) end |
#run_dispatch(worker) ⇒ Object
dispatch records returns the # of records dispatched
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/pampa_dispatcher.rb', line 189 def run_dispatch(worker) # get # of available slots n = self.available_slots(worker) # dispatching n pending records i = 0 if n>0 self.selecting(worker, n).each { |id| # count the # of dispatched i += 1 # dispatch records o = self.table.where(self.field_primary_key.to_sym => id).first o[self.field_id.to_sym] = worker.id o[self.field_time.to_sym] = now() o[self.field_start_time.to_sym] = nil if !self.field_start_time.nil? o[self.field_end_time.to_sym] = nil if !self.field_end_time.nil? o.save # release resources DB.disconnect GC.start } end # return i end |
#run_relaunch(worker) ⇒ Object
relaunch records
172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/pampa_dispatcher.rb', line 172 def run_relaunch(worker) # relaunch failed records self.relaunching(worker, self.queue_size).each { |id| o = self.table.where(self.field_primary_key.to_sym => id).first if self.relauncher_function.nil? self.relaunch(o) else self.relauncher_function.call(o, self) end # release resources DB.disconnect GC.start } end |
#selecting(worker, n) ⇒ Object
selecting_dataset
117 118 119 120 121 122 123 124 |
# File 'lib/pampa_dispatcher.rb', line 117 def selecting(worker, n) if self.allowing_function.nil? return self.selecting_dataset(worker, n).map { |o| o[self.field_primary_key.to_sym] } else # TODO: validar que retorna un array de strings return self.selecting_function.call(worker, self, n) end end |
#selecting_dataset(worker, n) ⇒ Object
choose the records to dispatch returns an array of IDs
110 111 112 113 114 115 |
# File 'lib/pampa_dispatcher.rb', line 110 def selecting_dataset(worker, n) ds = self.table.select(self.field_primary_key.to_sym).where(self.field_id.to_sym => nil) ds = ds.filter(self.field_end_time.to_sym => nil) if !self.field_end_time.nil? ds = ds.filter("#{self.field_times.to_s} IS NULL OR #{self.field_times.to_s} < #{self.max_try_times.to_s}") if !self.field_times.nil? ds.limit(n) end |
#start(o) ⇒ Object
152 153 154 155 156 157 158 159 160 |
# File 'lib/pampa_dispatcher.rb', line 152 def start(o) if self.starter_function.nil? o[self.field_start_time.to_sym] = now() if !self.field_start_time.nil? o[self.field_times.to_sym] = o[self.field_times.to_sym].to_i + 1 o.save else self.starter_function.call(o, self) end end |