Class: DTWorker

Inherits:
Object
  • Object
show all
Includes:
RULE_DSL
Defined in:
lib/datatransit/rule_dsl.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from RULE_DSL

#batch_by, #choose_table, #filter_out_with, #get_all_tables, #load_rule, #migrate, #post_work, #pre_work, #register_primary_key

Constructor Details

#initialize(rule_file) ⇒ DTWorker

Returns a new instance of DTWorker.



108
109
110
111
# File 'lib/datatransit/rule_dsl.rb', line 108

def initialize rule_file
  @rule_file = rule_file
  @batch_size = 500
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



106
107
108
# File 'lib/datatransit/rule_dsl.rb', line 106

def batch_size
  @batch_size
end

Instance Method Details

#do_batch_copy(sourceCls, targetCls, task, pk = nil, in_batch = false) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/datatransit/rule_dsl.rb', line 163

def do_batch_copy (sourceCls, targetCls, task, pk = nil, in_batch = false)
  count = sourceCls.where(task.search_cond).size.to_f
  return if count <= 0
  
  how_many_batch = (count / @batch_size).ceil
  #the progress bar
  bar = ProgressBar.new(count)
  
  if in_batch 
    0.upto (how_many_batch-1) do |i|  
      sourceCls.where(task.search_cond).find_each(
        start: i * @batch_size, batch_size: @batch_size) do |source_row|
        
        #update progress
        bar.increment!
        
        if task.filter
          next if do_filter_out task.filter, source_row
        end
        target_row = targetCls.new source_row.attributes

        #activerecord would ignore pk field, and the above initialization will result nill primary key.
        #here the original pk is used in the target_row, it is what we need exactly.
        if pk 
          target_row.send( "#{pk}=", source_row.send("#{pk}") )
        end

        target_row.save
      end
    end
  else
    sourceCls.where(task.search_cond).each do |source_row|
      #update progress
      bar.increment!
      
      if task.filter
        next if do_filter_out task.filter, source_row
      end
      target_row = targetCls.new source_row.attributes

      #activerecord would ignore pk field, and the above initialization will result nill primary key.
      #here the original pk is used in the target_row, it is what we need exactly.
      if pk 
        target_row.send( "#{pk}=", source_row.send("#{pk}") )
      end

      target_row.save
    end
  end
  
  
end

#do_filter_out(filter, row) ⇒ Object



216
217
218
219
220
221
# File 'lib/datatransit/rule_dsl.rb', line 216

def do_filter_out filter, row
  if filter.call row
    return true
  end
  false
end

#do_task(task) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/datatransit/rule_dsl.rb', line 127

def do_task task
  pks = task.pk#a context-free-variable in context switches
  tables = task.tables
  
  tables.each do |tbl|
    sourceCls = Class.new(DataTransit::Source::SourceBase) do
      self.table_name = tbl
    end

    #columns = sourceCls.columns.map(&:name).map(&:downcase)
    columns = sourceCls.columns
    pk_column = get_pk_column(columns, pks)
    if pk_column != nil
      pk, pk_type = pk_column.name, pk_column.type
    else
      pk, pk_type = nil, nil
    end

    sourceCls.instance_eval( "self.primary_key = \"#{pk}\"") if pk != nil

    targetCls = Class.new(DataTransit::Target::TargetBase) do
      self.table_name = tbl
    end
    targetCls.instance_eval( "self.primary_key = \"#{pk}\"") if pk != nil

    print "\ntable ", tbl, ":\n"
    do_user_ar_proc targetCls, task.pre_work if task.pre_work
    do_batch_copy sourceCls, targetCls, task, pk, pk_type=="integer"
    do_user_ar_proc targetCls, task.post_work if task.post_work
  end
end

#do_user_ar_proc(targetCls, proc) ⇒ Object



159
160
161
# File 'lib/datatransit/rule_dsl.rb', line 159

def do_user_ar_proc targetCls, proc
  proc.call targetCls
end

#do_workObject



117
118
119
120
121
122
123
124
125
# File 'lib/datatransit/rule_dsl.rb', line 117

def do_work
  #we copy all or nothing. a mess is not welcome here
  #on failures, transaction will rollback automatically
  DataTransit::Target::TargetBase.transaction do
    @taskset.each do |task|
      do_task task
    end
  end
end

#load_workObject



113
114
115
# File 'lib/datatransit/rule_dsl.rb', line 113

def load_work
  load_rule @rule_file
end