Class: Xcopier::Transformer

Inherits:
Actor
  • Object
show all
Defined in:
lib/xcopier/transformer.rb

Instance Attribute Summary collapse

Attributes inherited from Actor

#__queue, #copier, #parent, #result, #thread

Instance Method Summary collapse

Methods inherited from Actor

#__work__, spawn!, #tell, #terminate!, #wait

Constructor Details

#initialize(input, output, *rest) ⇒ Transformer

Returns a new instance of Transformer.



10
11
12
13
14
# File 'lib/xcopier/transformer.rb', line 10

def initialize(input, output, *rest)
  @input = input
  @output = output
  super(*rest)
end

Instance Attribute Details

#inputObject (readonly)

Returns the value of attribute input.



8
9
10
# File 'lib/xcopier/transformer.rb', line 8

def input
  @input
end

#operationObject (readonly)

Returns the value of attribute operation.



8
9
10
# File 'lib/xcopier/transformer.rb', line 8

def operation
  @operation
end

#outputObject (readonly)

Returns the value of attribute output.



8
9
10
# File 'lib/xcopier/transformer.rb', line 8

def output
  @output
end

Instance Method Details

#on_error(error) ⇒ Object



27
28
29
30
# File 'lib/xcopier/transformer.rb', line 27

def on_error(error)
  debug "Transformer#error: #{error.message}"
  parent.tell([:error, error])
end

#on_message(message) ⇒ Object



16
17
18
19
20
21
22
23
24
25
# File 'lib/xcopier/transformer.rb', line 16

def on_message(message)
  case message
  in [:transform, Operation => operation]
    debug "Transformer#message: type=transform operation=#{operation.inspect}"
    process(operation)
  else
    debug "Transformer#message: type=unknown message=#{message.inspect}"
    raise UnknownMessageError, "Unknown message: #{message.inspect}"
  end
end

#process(operation) ⇒ Object



32
33
34
35
36
37
# File 'lib/xcopier/transformer.rb', line 32

def process(operation)
  setup(operation)
  work
ensure
  teardown
end

#setup(operation) ⇒ Object



69
70
71
# File 'lib/xcopier/transformer.rb', line 69

def setup(operation)
  @operation = operation
end

#teardownObject



73
74
75
# File 'lib/xcopier/transformer.rb', line 73

def teardown
  @operation = nil
end

#transform(chunk) ⇒ Object



55
56
57
58
59
# File 'lib/xcopier/transformer.rb', line 55

def transform(chunk)
  chunk.map do |record|
    transform_record(record)
  end
end

#transform_record(record) ⇒ Object



61
62
63
64
65
66
67
# File 'lib/xcopier/transformer.rb', line 61

def transform_record(record)
  record.each_with_object({}) do |(key, value), hash|
    value = apply_overrides(value, key, record)
    value = apply_anonymization(value, key)
    hash[key] = value
  end
end

#workObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/xcopier/transformer.rb', line 39

def work
  loop do
    chunk = input.pop

    if chunk == :done
      debug "Transformer: done"
      output.push(:done)
      @operation = nil
      break
    end
    debug "Transformer: transforming #{chunk.size} records"
    transformed = transform(chunk)
    output.push(transformed)
  end
end