Class: ActiveRecord::Mapreduce

Inherits:
Object
  • Object
show all
Defined in:
lib/skynet/skynet_active_record_extensions.rb

Constant Summary collapse

BATCH_SIZE =
1000
MAX_BATCHES_PER_JOB =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Mapreduce

Returns a new instance of Mapreduce.



72
73
74
75
76
# File 'lib/skynet/skynet_active_record_extensions.rb', line 72

def initialize(options = {})
  @find_args = options[:find_args]
  @batch_size = options[:batch_size] || BATCH_SIZE
  @model_class = options[:model_class]
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



66
67
68
# File 'lib/skynet/skynet_active_record_extensions.rb', line 66

def batch_size
  @batch_size
end

#find_argsObject

Returns the value of attribute find_args.



66
67
68
# File 'lib/skynet/skynet_active_record_extensions.rb', line 66

def find_args
  @find_args
end

#model_classObject

Returns the value of attribute model_class.



67
68
69
# File 'lib/skynet/skynet_active_record_extensions.rb', line 67

def model_class
  @model_class
end

Class Method Details

.find(*args) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/skynet/skynet_active_record_extensions.rb', line 86

def self.find(*args)
  if not args.first.is_a?(Hash)
    args.shift
  end       
  if args.nil? or args.empty?
    args = {} 
  else
    args = *args
  end
  new(:find_args => args, :batch_size => args.delete(:batch_size), :model_class => args.delete(:model_class))
end

.logObject



223
224
225
# File 'lib/skynet/skynet_active_record_extensions.rb', line 223

def self.log
  Skynet::Logger.get
end

.map(datas) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/skynet/skynet_active_record_extensions.rb', line 227

def self.map(datas)
  return unless datas and not datas.empty?
  datas.each do |data|    
    next if (not data.is_a?(Array))
    next if data.empty?
    model_class = data[3].constantize
    table_name = model_class.table_name
    conditions = "#{table_name}.#{model_class.primary_key} >= #{data[0]}"
    conditions += " AND #{table_name}.#{model_class.primary_key} <= #{data[1]}" if data[1] > data[0]
    conditions = "(#{conditions})"
    # conditions = "ID BETWEEN #{data[0]} and #{data[1]}"
    if not data[2]
      data[2] = {:conditions => conditions}
    elsif data[2].is_a?(Hash) and data[2].empty?
      data[2] = {:conditions => conditions}
    elsif data[2].is_a?(Hash) and (not data[2][:conditions] or data[2][:conditions].empty?)
      data[2][:conditions] = conditions
    else
      data[2][:conditions] += " AND #{conditions}"
    end                             
    data[2][:select] = "#{table_name}.*"
    
    # log.error "GETTING #{data.pretty_print_inspect}"
    models = model_class.find(:all, data[2])
    # log.error "GOT MODELS: #{models.size}"
    models.each do |ar_object|
      begin
        if data[4].kind_of?(String)
          if ar_object.respond_to?(data[4].to_sym)
            # log.error "CALLING #{data[4]} on #{ar_object.class}:#{ar_object.id}"
            ar_object.send(data[4].to_sym)
          else
            begin
              data[4].constantize.each(ar_object)
            rescue NameError
              raise NameError.new("#{data[4]} is not a class or an instance method in #{model_class}")
            end
          end
        else
          data[4].call(ar_object)
        end
      rescue Exception => e
        if data[4].kind_of?(String)
          log.error("Error in #{data[4]} #{e.inspect} #{e.backtrace.join("\n")}")              
        else
          log.error("Error in #{self} with given block #{e.inspect} #{e.backtrace.join("\n")}")
        end
      end
    end
  end    
  nil
end

.model_class(model_class) ⇒ Object



217
218
219
220
221
# File 'lib/skynet/skynet_active_record_extensions.rb', line 217

def self.model_class(model_class)
  (class << self; self; end).module_eval do
     define_method(:model_class) {model_class}
  end      
end

Instance Method Details

#chunk_query(opts = {}) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/skynet/skynet_active_record_extensions.rb', line 135

def chunk_query(opts={})

  conditions = "#{table_name}.#{primary_key} > #{opts[:id]} AND ((@t1:=(@t1+1) % #{batch_size})=0)"
  opts = opts.clone                                                               
  if opts[:conditions].nil? or opts[:conditions].empty?
    opts[:conditions] = conditions
  else
    opts[:conditions] += " AND " unless opts[:conditions].empty?
    opts[:conditions] += conditions
  end                
  limit = opts[:limit] ? "LIMIT #{opts[:limit]}" : nil
  # select @t2:=(@t2+1), @t3:=@t4, @t4:=id from profiles where ( ((@t1:=(@t1+1) % 1000)=0) or (((@t1+1) % 1000)=0) )  order by id LIMIT 100;

  # BEST
  # select @t1:=0, @t2:=0, @t3:=0, @t4:=0;
    # select @t2:=(@t2+1) as cnt, ((@t3:=@t4)+1) as first, @t4:=id as last from profiles where ((@t1:=(@t1+1) % 1000)=0) order by id LIMIT 100;
    # select (@t2:=(@t2+1) % 2) as evenodd, ((@t3:=@t4)+1) as first, @t4:=id as last from profiles where ((@t1:=(@t1+1) % 1000)=0) order by id LIMIT 100;

  execute('select @t1:=0, @t2:=-1, @t3:=0, @t4:=0')
  sql = "select @t2:=(@t2+1) as cnt, ((@t3:=@t4)+1) as first, @t4:=#{table_name}.#{primary_key} as last from #{table_name} #{opts[:joins]} where #{opts[:conditions]} ORDER BY #{table_name}.#{primary_key} #{limit}"
  # log.error "SQL #{sql}"
  select_all(sql)

  # mc.connection.select_values(mc.send(:construct_finder_sql, :select => "#{mc.table_name}.id", :joins => opts[:joins], :conditions => conditions, :limit => opts[:limit], :order => :id))
end

#eachObject



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/skynet/skynet_active_record_extensions.rb', line 210

def map(klass_or_method=nil,&block)    
  klass_or_method ||= model_class
  log = Skynet::Logger.get

  batches = []                    
  each_range(find_args) do |ids,ii|
    batch_item = [
        ids['first'].to_i, 
        ids['last'].to_i, 
        find_args.clone,
        model_class
    ]
    if block_given?          
      batch_item << block
    else
      batch_item << "#{klass_or_method}" 
    end
    batches << batch_item
    if batches.size >= MAX_BATCHES_PER_JOB
      log.error "MAX BATCH SIZE EXCEEDED RUNNING: #{batches.size}"
      run_job_for_batch(batches)
      batches = []    
    end
  end
  run_job_for_batch(batches)      
end

#each_range(opts = {}) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/skynet/skynet_active_record_extensions.rb', line 102

def each_range(opts={})
  opts = opts.clone
  opts[:id] || opts[:id] = 0
  count = model_klass.count(:all,:conditions => opts[:conditions], :joins => opts[:joins])
  if count <= batch_size
    return yield({"first" => 0, "last" => nil, "cnt" => 0}, 0)
  end
  
  rows = chunk_query(opts)            
  # log.error "ROWS, #{rows.pretty_print_inspect}"
  
  
  ii = 0                        
  if rows.empty?
    rows = [{"first" => 0, "last" => nil, "cnt" => ii}]
  end
  last_row = nil
  while rows.any?
    rows.each do |record| 
      last_row = record
      yield record, ii          
    end
    ii +=1                                                  
    return if last_row["last"].nil?
    rows = chunk_query(opts.merge(:id => rows.last["last"])) 
  end
  
  if last_row["last"] and (last_row["last"].to_i - last_row["first"].to_i) >= batch_size 
    catchall_row = {"first" => last_row["last"].to_i+1, "last" => nil, "cnt" => ii}
    yield catchall_row, ii
  end
end

#logObject



98
99
100
# File 'lib/skynet/skynet_active_record_extensions.rb', line 98

def log
  Skynet::Logger.get
end

#map(klass_or_method = nil, &block) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/skynet/skynet_active_record_extensions.rb', line 183

def map(klass_or_method=nil,&block)    
  klass_or_method ||= model_class
  log = Skynet::Logger.get

  batches = []                    
  each_range(find_args) do |ids,ii|
    batch_item = [
        ids['first'].to_i, 
        ids['last'].to_i, 
        find_args.clone,
        model_class
    ]
    if block_given?          
      batch_item << block
    else
      batch_item << "#{klass_or_method}" 
    end
    batches << batch_item
    if batches.size >= MAX_BATCHES_PER_JOB
      log.error "MAX BATCH SIZE EXCEEDED RUNNING: #{batches.size}"
      run_job_for_batch(batches)
      batches = []    
    end
  end
  run_job_for_batch(batches)      
end

#mapreduceObject



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/skynet/skynet_active_record_extensions.rb', line 211

def map(klass_or_method=nil,&block)    
  klass_or_method ||= model_class
  log = Skynet::Logger.get

  batches = []                    
  each_range(find_args) do |ids,ii|
    batch_item = [
        ids['first'].to_i, 
        ids['last'].to_i, 
        find_args.clone,
        model_class
    ]
    if block_given?          
      batch_item << block
    else
      batch_item << "#{klass_or_method}" 
    end
    batches << batch_item
    if batches.size >= MAX_BATCHES_PER_JOB
      log.error "MAX BATCH SIZE EXCEEDED RUNNING: #{batches.size}"
      run_job_for_batch(batches)
      batches = []    
    end
  end
  run_job_for_batch(batches)      
end

#model_klassObject



78
79
80
# File 'lib/skynet/skynet_active_record_extensions.rb', line 78

def model_klass
  @model_klass ||= model_class.constantize
end

#run_job_for_batch(batches, &block) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/skynet/skynet_active_record_extensions.rb', line 161

def run_job_for_batch(batches,&block)
  jobopts = {
    :mappers               => 20000,
    :map_data              => batches,
    :name                  => "each #{model_class} MASTER",
    :map_name              => "each #{model_class} MAP",
    :map_timeout           => 60,
    :master_timeout        => 12.hours,
    :master_result_timeout => 60,        
    :master_retry          => 0,
    :map_retry             => 0
  }   

  job = nil
  if block_given?
    job = Skynet::Job.new(jobopts.merge(:map => block), :local_master => true)
  else
    job = Skynet::AsyncJob.new(jobopts.merge(:map_reduce_class => "#{self.class}"))
  end         
  job.run
end