Class: ActiveRecord::Mapreduce

Inherits:
Object
  • Object
show all
Defined in:
lib/skynet/skynet_active_record_extensions.rb

Constant Summary collapse

BATCH_SIZE =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Mapreduce

Returns a new instance of Mapreduce.



67
68
69
70
71
# File 'lib/skynet/skynet_active_record_extensions.rb', line 67

def initialize(options = {})
  @find_args = options[:find_args]
  @batch_size = options[:batch_size] || BATCH_SIZE
  @model_class = options[:model_class]
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



64
65
66
# File 'lib/skynet/skynet_active_record_extensions.rb', line 64

def batch_size
  @batch_size
end

#find_argsObject

Returns the value of attribute find_args.



64
65
66
# File 'lib/skynet/skynet_active_record_extensions.rb', line 64

def find_args
  @find_args
end

#model_classObject

Returns the value of attribute model_class.



65
66
67
# File 'lib/skynet/skynet_active_record_extensions.rb', line 65

def model_class
  @model_class
end

Class Method Details

.find(*args) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/skynet/skynet_active_record_extensions.rb', line 77

def self.find(*args)
  if not args.first.is_a?(Hash)
    args.shift
  end       
  if args.nil? or args.empty?
    args = {} 
  else
    args = *args
  end
  new(:find_args => args, :batch_size => args.delete(:batch_size), :model_class => args.delete(:model_class))
end

.logObject



192
193
194
# File 'lib/skynet/skynet_active_record_extensions.rb', line 192

def self.log
  Skynet::Logger.get
end

.map(datas) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/skynet/skynet_active_record_extensions.rb', line 196

def self.map(datas)
  datas.each do |data|    
    model_class = data[3].constantize
    table_name = model_class.table_name
    conditions = "#{table_name}.id >= #{data[0]}"
    conditions += " AND #{table_name}.id <= #{data[1]}" if data[1] > data[0]
    conditions = "(#{conditions})"
    # conditions = "ID BETWEEN #{data[0]} and #{data[1]}"
    if data[2].empty? or data[2][:conditions].empty?
      data[2] = {:conditions => conditions}
    else                      
      data[2][:conditions] += " AND #{conditions}"
    end                             
    data[2][:select] = "#{table_name}.*"
    model_class.find(:all, data[2]).each do |ar_object|
      begin
        if data[4].kind_of?(String)
          begin
            data[4].constantize.each(ar_object)
          rescue NameError
            if ar_object.respond_to?(data[4].to_sym)
              ar_object.send(data[4])
            else
              raise NameError.new("#{data[4]} is not a class or an instance method in #{model_class}")
            end
          end
        else
          data[4].call(ar_object)
        end
      rescue Exception => e
        if data[4].kind_of?(String)
          log.error("Error in #{data[4]} #{e.inspect} #{e.backtrace.join("\n")}")              
        else
          log.error("Error in #{self} with given block #{e.inspect} #{e.backtrace.join("\n")}")
        end
      end
    end
  end    
  nil
end

.model_class(model_class) ⇒ Object



186
187
188
189
190
# File 'lib/skynet/skynet_active_record_extensions.rb', line 186

def self.model_class(model_class)
  (class << self; self; end).module_eval do
     define_method(:model_class) {model_class}
  end      
end

Instance Method Details

#chunk_query(opts = {}) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/skynet/skynet_active_record_extensions.rb', line 115

def chunk_query(opts={})
  mc = model_class.constantize
  table_name = mc.table_name

  conditions = "#{table_name}.id > #{opts[:id]} AND ((@t1:=(@t1+1) % #{batch_size})=0)"
  opts = opts.clone                                                               
  if opts[:conditions].nil? or opts[:conditions].empty?
    opts[:conditions] = conditions
  else
    opts[:conditions] += " AND " unless opts[:conditions].empty?
    opts[:conditions] += conditions
  end                
  limit = opts[:limit] ? "LIMIT #{opts[:limit]}" : nil
  # select @t2:=(@t2+1), @t3:=@t4, @t4:=id from profiles where ( ((@t1:=(@t1+1) % 1000)=0) or (((@t1+1) % 1000)=0) )  order by id LIMIT 100;

  # BEST
  # select @t1:=0, @t2:=0, @t3:=0, @t4:=0;
    # select @t2:=(@t2+1) as cnt, ((@t3:=@t4)+1) as first, @t4:=id as last from profiles where ((@t1:=(@t1+1) % 1000)=0) order by id LIMIT 100;
    # select (@t2:=(@t2+1) % 2) as evenodd, ((@t3:=@t4)+1) as first, @t4:=id as last from profiles where ((@t1:=(@t1+1) % 1000)=0) order by id LIMIT 100;

  mc.connection.execute('select @t1:=0, @t2:=-1, @t3:=0, @t4:=0')
  mc.connection.select_all("select @t2:=(@t2+1) as cnt, ((@t3:=@t4)+1) as first, @t4:=#{table_name}.id as last from #{table_name} #{opts[:joins]} where #{opts[:conditions]} ORDER BY #{table_name}.id #{limit}")

  # mc.connection.select_values(mc.send(:construct_finder_sql, :select => "#{mc.table_name}.id", :joins => opts[:joins], :conditions => conditions, :limit => opts[:limit], :order => :id))
end

#each(klass_or_method = nil, &block) ⇒ Object Also known as: mapreduce



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/skynet/skynet_active_record_extensions.rb', line 142

def each(klass_or_method=nil,&block)    
  klass_or_method ||= model_class
  batches = []
  each_range(find_args) do |ids,ii|
    batch_count = ids["cnt"].to_i
    batches[batch_count] = [
                              ids['first'].to_i, 
                              ids['last'].to_i, 
                              find_args.clone,
                              model_class
                            ]
    if block_given?          
      batches[batch_count][4] = block
    else
      batches[batch_count][4] = "#{klass_or_method}" 
    end
  end
  
  job = nil
  jobopts = {
    :map_tasks => 20000,
    :map_data => batches,
    :name => "each #{model_class} MASTER",
    :map_name => "each #{model_class} MAP",
    :map_timeout      => 1.hour,
    :reduce_timeout   => 1.hour,
    :master_timeout => 8.hours,
    :master_result_timeout => 1.minute
    
  }   
  if block_given?
    job = Skynet::Job.new(jobopts.merge(:map_reduce_class => "#{self.class}"))
  else
    job = Skynet::AsyncJob.new(jobopts.merge(:map_reduce_class => "#{self.class}"))
  end         
  job.run
end

#each_range(opts = {}) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/skynet/skynet_active_record_extensions.rb', line 89

def each_range(opts={})
  opts = opts.clone
  opts[:id] || opts[:id] = 0
  rows = chunk_query(opts)            
  
  ii = 0                        
  if rows.empty?
    rows = [{"first" => 0, "last" => nil, "cnt" => ii}]
  end
  last_row = nil
  while rows.any?
    rows.each do |record| 
      last_row = record
      yield record, ii          
    end
    ii +=1                                                  
    return if last_row["last"].nil?
    rows = chunk_query(opts.merge(:id => rows.last["last"])) 
  end
  
  if last_row["last"] and (last_row["last"].to_i - last_row["first"].to_i) >= batch_size 
    catchall_row = {"first" => last_row["last"].to_i+1, "last" => nil, "cnt" => ii}
    yield catchall_row, ii
  end
end