Class: DataAnon::Strategy::Base

Inherits:
Object
  • Object
show all
Includes:
Utils::Logging
Defined in:
lib/strategy/base.rb

Direct Known Subclasses

Blacklist, MongoDB::Whitelist, Whitelist

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utils::Logging

#logger, #logger=

Constructor Details

#initialize(source_database, destination_database, name, user_strategies) ⇒ Base

Returns a new instance of Base.



8
9
10
11
12
13
14
15
16
17
# File 'lib/strategy/base.rb', line 8

def initialize source_database, destination_database, name, user_strategies
  @name = name
  @user_strategies = user_strategies
  @fields = {}
  @source_database = source_database
  @destination_database = destination_database
  @fields_missing_strategy = DataAnon::Core::FieldsMissingStrategy.new name
  @errors = DataAnon::Core::TableErrors.new(@name)
  @primary_keys = []
end

Instance Attribute Details

#errorsObject

Returns the value of attribute errors.



6
7
8
# File 'lib/strategy/base.rb', line 6

def errors
  @errors
end

#fieldsObject

Returns the value of attribute fields.



6
7
8
# File 'lib/strategy/base.rb', line 6

def fields
  @fields
end

#fields_missing_strategyObject

Returns the value of attribute fields_missing_strategy.



6
7
8
# File 'lib/strategy/base.rb', line 6

def fields_missing_strategy
  @fields_missing_strategy
end

#user_strategiesObject

Returns the value of attribute user_strategies.



6
7
8
# File 'lib/strategy/base.rb', line 6

def user_strategies
  @user_strategies
end

Class Method Details

.whitelist?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/strategy/base.rb', line 19

def self.whitelist?
  false
end

Instance Method Details

#anonymize(*fields, &block) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/strategy/base.rb', line 56

def anonymize *fields, &block
  if block.nil?
    fields.each { |f| @fields[f] = DataAnon::Strategy::Field::DefaultAnon.new(@user_strategies) }
    temp = self
    return Class.new do
      @temp_fields = fields
      @table_fields = temp.fields
      def self.using field_strategy
        @temp_fields.each { |f| @table_fields[f] = field_strategy }
      end
    end
  else
    fields.each { |f| @fields[f] = DataAnon::Strategy::Field::Anonymous.new(&block) }
  end
end

#batch_size(size) ⇒ Object



32
33
34
# File 'lib/strategy/base.rb', line 32

def batch_size size
  @batch_size = size
end

#continue(&block) ⇒ Object



52
53
54
# File 'lib/strategy/base.rb', line 52

def continue &block
  @continue_block = block
end

#default_strategy(field_name) ⇒ Object



76
77
78
79
# File 'lib/strategy/base.rb', line 76

def default_strategy field_name
  @fields_missing_strategy.missing field_name
  DataAnon::Strategy::Field::DefaultAnon.new(@user_strategies)
end

#dest_tableObject



81
82
83
84
85
86
# File 'lib/strategy/base.rb', line 81

def dest_table
  return @dest_table unless @dest_table.nil?
  table_klass = Utils::DestinationTable.create @name, @primary_keys
  table_klass.establish_connection @destination_database if @destination_database
  @dest_table = table_klass
end

#is_primary_key?(field) ⇒ Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/strategy/base.rb', line 72

def is_primary_key? field
  @primary_keys.select { |key| field == key }.length > 0
end

#limit(limit) ⇒ Object



36
37
38
# File 'lib/strategy/base.rb', line 36

def limit limit
  @limit = limit
end

#primary_key(*fields) ⇒ Object



28
29
30
# File 'lib/strategy/base.rb', line 28

def primary_key *fields
  @primary_keys = fields
end

#processObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/strategy/base.rb', line 95

def process
  logger.debug "Processing table #{@name} with fields strategies #{@fields}"
  total = source_table.count
  if total > 0
    progress = progress_bar.new(@name, total)
    if @primary_keys.empty? || !@batch_size.present?
      process_table progress
    elsif @thread_num.present?
      process_table_in_threads progress
    else
      process_table_in_batches progress
    end
    progress.close
  end
  if source_table.respond_to?('clear_all_connections!')
    source_table.clear_all_connections!
  end
end

#process_fields(&block) ⇒ Object



23
24
25
26
# File 'lib/strategy/base.rb', line 23

def process_fields &block
  self.instance_eval &block
  self
end

#process_record_if(index, record) ⇒ Object



187
188
189
190
191
192
# File 'lib/strategy/base.rb', line 187

def process_record_if index, record
  return if @skip_block && @skip_block.call(index, record)
  return if @continue_block && !@continue_block.call(index, record)

  process_record index, record
end

#process_table(progress) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/strategy/base.rb', line 114

def process_table progress
  index = 0

  source_table_limited.each do |record|
    index += 1
    begin
      process_record_if index, record
    rescue => exception
      @errors.log_error record, exception
    end
    progress.show index
  end
end

#process_table_in_batches(progress) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/strategy/base.rb', line 128

def process_table_in_batches progress
  logger.info "Processing table #{@name} records in batch size of #{@batch_size}"
  index = 0

  source_table_limited.find_each(:batch_size => @batch_size) do |record|
    index += 1
    begin
      process_record_if index, record
    rescue => exception
      @errors.log_error record, exception
    end
    progress.show index
  end
end

#process_table_in_threads(progress) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/strategy/base.rb', line 143

def process_table_in_threads progress
  logger.info "Processing table #{@name} records in batch size of #{@batch_size} [THREADS]"

  index = 0
  threads = []

  source_table.find_in_batches(batch_size: @batch_size) do |records|
    until threads.count(&:alive?) <= @thread_num
      thr = threads.delete_at 0
      thr.join
      progress.show index
    end

    thr = Thread.new {
      records.each do |record|
        begin
          process_record_if index, record
          index += 1
        rescue => exception
          puts exception.inspect
          @errors.log_error record, exception
        end
      end
    }
    threads << thr
  end

  until threads.empty?
    thr = threads.delete_at 0
    thr.join
    progress.show index
  end
end

#progress_barObject



194
195
196
# File 'lib/strategy/base.rb', line 194

def progress_bar
  @progress_bar || DataAnon::Utils::ProgressBar
end

#progress_bar_class(progress_bar) ⇒ Object



198
199
200
# File 'lib/strategy/base.rb', line 198

def progress_bar_class progress_bar
  @progress_bar = progress_bar
end

#skip(&block) ⇒ Object



48
49
50
# File 'lib/strategy/base.rb', line 48

def skip &block
  @skip_block = block
end

#source_tableObject



88
89
90
91
92
93
# File 'lib/strategy/base.rb', line 88

def source_table
  return @source_table unless @source_table.nil?
  table_klass = Utils::SourceTable.create @name, @primary_keys
  table_klass.establish_connection @source_database
  @source_table = table_klass
end

#source_table_limitedObject



177
178
179
180
181
182
183
184
185
# File 'lib/strategy/base.rb', line 177

def source_table_limited
  @source_table_limited ||= begin
    if @limit.present?
      source_table.all.limit(@limit).order(created_at: :desc)
    else
      source_table.all
    end
  end
end

#thread_num(thread_num) ⇒ Object



40
41
42
# File 'lib/strategy/base.rb', line 40

def thread_num thread_num
  @thread_num = thread_num
end

#whitelist(*fields) ⇒ Object



44
45
46
# File 'lib/strategy/base.rb', line 44

def whitelist *fields
  fields.each { |f| @fields[f] = DataAnon::Strategy::Field::Whitelist.new }
end