Class: BlackStack::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/pampa_dispatcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(h) ⇒ Dispatcher

setup dispatcher configuration here



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pampa_dispatcher.rb', line 54

def initialize(h)
  self.name = h[:name]
  self.table = h[:table]
  self.field_primary_key = h[:field_primary_key]
  self.field_id = h[:field_id]
  self.field_time = h[:field_time]
  self.field_times = h[:field_times]
  self.field_start_time = h[:field_start_time]
  self.field_end_time = h[:field_end_time]
  self.queue_size = h[:queue_size]
  self.max_job_duration_minutes = h[:max_job_duration_minutes]  
  self.max_try_times = h[:max_try_times]
  self.occupied_function = h[:occupied_function]
  self.allowing_function = h[:allowing_function]
  self.selecting_function = h[:selecting_function]
  self.relaunching_function = h[:relaunching_function]
  self.relauncher_function = h[:relauncher_function]
end

Instance Attribute Details

#allowing_functionObject

additional function to decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client it should returns true or false keep it nil if you want it returns always true



32
33
34
# File 'lib/pampa_dispatcher.rb', line 32

def allowing_function
  @allowing_function
end

#field_end_timeObject

Returns the value of attribute field_end_time.



15
16
17
# File 'lib/pampa_dispatcher.rb', line 15

def field_end_time
  @field_end_time
end

#field_idObject

Returns the value of attribute field_id.



11
12
13
# File 'lib/pampa_dispatcher.rb', line 11

def field_id
  @field_id
end

#field_primary_keyObject

Returns the value of attribute field_primary_key.



10
11
12
# File 'lib/pampa_dispatcher.rb', line 10

def field_primary_key
  @field_primary_key
end

#field_start_timeObject

Returns the value of attribute field_start_time.



14
15
16
# File 'lib/pampa_dispatcher.rb', line 14

def field_start_time
  @field_start_time
end

#field_timeObject

Returns the value of attribute field_time.



12
13
14
# File 'lib/pampa_dispatcher.rb', line 12

def field_time
  @field_time
end

#field_timesObject

Returns the value of attribute field_times.



13
14
15
# File 'lib/pampa_dispatcher.rb', line 13

def field_times
  @field_times
end

#finisher_functionObject

additional function to perform the update on a record to flag the finishing of the job by default this function will set the :field_end_time field with the current datetime keep this parameter nil if you want to use the default algorithm



50
51
52
# File 'lib/pampa_dispatcher.rb', line 50

def finisher_function
  @finisher_function
end

#max_job_duration_minutesObject

max number of minutes that a job should take to process. if :end_time keep nil x minutes after :start_time, that’s considered as the job has failed or interrumped



20
21
22
# File 'lib/pampa_dispatcher.rb', line 20

def max_job_duration_minutes
  @max_job_duration_minutes
end

#max_try_timesObject

max number of times that a record can start to process & fail (:start_time field is not nil, but :end_time field is still nil after :max_job_duration_minutes)



23
24
25
# File 'lib/pampa_dispatcher.rb', line 23

def max_try_times
  @max_try_times
end

#nameObject

Returns the value of attribute name.



6
7
8
# File 'lib/pampa_dispatcher.rb', line 6

def name
  @name
end

#occupied_functionObject

additional function to returns an array of objects pending to be processed by a worker. it should returns an array keep it nil if you want to run the default function



27
28
29
# File 'lib/pampa_dispatcher.rb', line 27

def occupied_function
  @occupied_function
end

#queue_sizeObject

max number of records assigned to a worker that have not started (:start_time field is nil)



17
18
19
# File 'lib/pampa_dispatcher.rb', line 17

def queue_size
  @queue_size
end

#relauncher_functionObject

additional function to perform the update on a record to retry keep this parameter nil if you want to use the default algorithm



42
43
44
# File 'lib/pampa_dispatcher.rb', line 42

def relauncher_function
  @relauncher_function
end

#relaunching_functionObject

additional function to choose the records to retry keep this parameter nil if you want to use the default algorithm



39
40
41
# File 'lib/pampa_dispatcher.rb', line 39

def relaunching_function
  @relaunching_function
end

#selecting_functionObject

additional function to choose the records to launch it should returns an array of IDs keep this parameter nil if you want to use the default algorithm



36
37
38
# File 'lib/pampa_dispatcher.rb', line 36

def selecting_function
  @selecting_function
end

#starter_functionObject

additional function to perform the update on a record to flag the starting of the job by default this function will set the :field_start_time field with the current datetime, and it will increase the :field_times counter keep this parameter nil if you want to use the default algorithm



46
47
48
# File 'lib/pampa_dispatcher.rb', line 46

def starter_function
  @starter_function
end

#tableObject

database information :field_times, :field_start_time and :field_end_time maybe nil



9
10
11
# File 'lib/pampa_dispatcher.rb', line 9

def table
  @table
end

Instance Method Details

#allowing(worker) ⇒ Object

decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client returns always true



99
100
101
102
103
104
105
106
# File 'lib/pampa_dispatcher.rb', line 99

def allowing(worker)
  if self.allowing_function.nil?
    return true
  else
    # TODO: validar que retorna true o false
    return self.allowing_function.call(worker, self)
  end
end

#available_slots(worker) ⇒ Object

returns the number of free slots in the procesing queue of this worker



86
87
88
89
90
91
92
93
94
# File 'lib/pampa_dispatcher.rb', line 86

def available_slots(worker)
  occupied = self.occupied_slots(worker).size
  allowed = self.queue_size
  if occupied > allowed
    return 0
  else
    return allowed - occupied
  end
end

#finish(o) ⇒ Object



162
163
164
165
166
167
168
169
# File 'lib/pampa_dispatcher.rb', line 162

def finish(o)
  if self.finisher_function.nil?
    o[self.field_end_time.to_sym] = now() if !self.field_end_time.nil?
    o.save
  else
    self.finisher_function.call(o, self)
  end
end

#occupied_slots(worker) ⇒ Object

returns an array of objects pending to be processed by the worker. it will select the records with :reservation_id == worker.id, and :start_time == nil



75
76
77
78
79
80
81
82
83
# File 'lib/pampa_dispatcher.rb', line 75

def occupied_slots(worker)
  if self.occupied_function.nil?
    return self.table.where(self.field_id.to_sym => worker.id, self.field_start_time.to_sym => nil).all if !self.field_start_time.nil?
    return self.table.where(self.field_id.to_sym => worker.id).all if self.field_start_time.nil?
  else
    # TODO: validar que retorna un entero
    return self.occupied_function.call(worker, self)
  end
end

#relaunch(o) ⇒ Object



144
145
146
147
148
149
150
# File 'lib/pampa_dispatcher.rb', line 144

def relaunch(o)
  o[self.field_id.to_sym] = nil
  o[self.field_time.to_sym] = nil
  o[self.field_start_time.to_sym] = nil if !self.field_start_time.nil?
  o[self.field_end_time.to_sym] = nil if !self.field_end_time.nil?
  o.save      
end

#relaunching(worker, n) ⇒ Object



135
136
137
138
139
140
141
142
# File 'lib/pampa_dispatcher.rb', line 135

def relaunching(worker, n)
  if self.relaunching_function.nil?
    return self.relaunching_dataset(worker, n).map { |o| o[self.field_primary_key.to_sym] }
  else
    # TODO: validar que retorna un array de strings
    return self.relaunching_function.call(worker, self, n)
  end
end

#relaunching_dataset(worker, n) ⇒ Object

choose the records to retry returns an array of IDs



128
129
130
131
132
133
# File 'lib/pampa_dispatcher.rb', line 128

def relaunching_dataset(worker, n)
  ds = self.table.select(self.field_primary_key.to_sym).where("#{self.field_time.to_s} < '#{(Time.now - 60*self.max_job_duration_minutes.to_i).strftime('%Y-%m-%d %H:%M:%S').to_s}'")
  ds = ds.filter("#{self.field_end_time.to_s} IS NULL") if !self.field_end_time.nil?  
#      ds = ds.filter("( #{self.field_times.to_s} IS NULL OR #{self.field_times.to_s} < #{self.max_try_times.to_s} ) ") if !self.field_times.nil?
  ds = ds.limit(n)
end

#run_dispatch(worker) ⇒ Object

dispatch records returns the # of records dispatched



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/pampa_dispatcher.rb', line 189

def run_dispatch(worker)
  # get # of available slots
  n = self.available_slots(worker)
  
  # dispatching n pending records
  i = 0
  if n>0
    self.selecting(worker, n).each { |id|
      # count the # of dispatched
      i += 1
      # dispatch records
      o = self.table.where(self.field_primary_key.to_sym => id).first
      o[self.field_id.to_sym] = worker.id
      o[self.field_time.to_sym] = now()
      o[self.field_start_time.to_sym] = nil if !self.field_start_time.nil?
      o[self.field_end_time.to_sym] = nil if !self.field_end_time.nil?
      o.save
      # release resources
      DB.disconnect
      GC.start        
    }
  end
  
  #      
  return i
end

#run_relaunch(worker) ⇒ Object

relaunch records



172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/pampa_dispatcher.rb', line 172

def run_relaunch(worker)
  # relaunch failed records
  self.relaunching(worker, self.queue_size).each { |id|
    o = self.table.where(self.field_primary_key.to_sym => id).first
    if self.relauncher_function.nil?
      self.relaunch(o)
    else
      self.relauncher_function.call(o, self)
    end
    # release resources
    DB.disconnect
    GC.start
  }
end

#selecting(worker, n) ⇒ Object

selecting_dataset



117
118
119
120
121
122
123
124
# File 'lib/pampa_dispatcher.rb', line 117

def selecting(worker, n)
  if self.allowing_function.nil?
    return self.selecting_dataset(worker, n).map { |o| o[self.field_primary_key.to_sym] }
  else
    # TODO: validar que retorna un array de strings
    return self.selecting_function.call(worker, self, n)
  end
end

#selecting_dataset(worker, n) ⇒ Object

choose the records to dispatch returns an array of IDs



110
111
112
113
114
115
# File 'lib/pampa_dispatcher.rb', line 110

def selecting_dataset(worker, n)
  ds = self.table.select(self.field_primary_key.to_sym).where(self.field_id.to_sym => nil) 
  ds = ds.filter(self.field_end_time.to_sym => nil) if !self.field_end_time.nil?  
  ds = ds.filter("#{self.field_times.to_s} IS NULL OR #{self.field_times.to_s} < #{self.max_try_times.to_s}") if !self.field_times.nil? 
  ds.limit(n)
end

#start(o) ⇒ Object



152
153
154
155
156
157
158
159
160
# File 'lib/pampa_dispatcher.rb', line 152

def start(o)
  if self.starter_function.nil?
    o[self.field_start_time.to_sym] = now() if !self.field_start_time.nil?
    o[self.field_times.to_sym] = o[self.field_times.to_sym].to_i + 1
    o.save
  else
    self.starter_function.call(o, self)
  end
end