Class: Xcopier::Transformer
- Inherits:
-
Actor
- Object
- Actor
- Xcopier::Transformer
show all
- Defined in:
- lib/xcopier/transformer.rb
Instance Attribute Summary collapse
Attributes inherited from Actor
#__queue, #copier, #parent, #result, #thread
Instance Method Summary
collapse
Methods inherited from Actor
#__work__, spawn!, #tell, #terminate!, #wait
Constructor Details
#initialize(input, output, *rest) ⇒ Transformer
Returns a new instance of Transformer.
10
11
12
13
14
|
# File 'lib/xcopier/transformer.rb', line 10
def initialize(input, output, *rest)
@input = input
@output = output
super(*rest)
end
|
Instance Attribute Details
Returns the value of attribute input.
8
9
10
|
# File 'lib/xcopier/transformer.rb', line 8
def input
@input
end
|
#operation ⇒ Object
Returns the value of attribute operation.
8
9
10
|
# File 'lib/xcopier/transformer.rb', line 8
def operation
@operation
end
|
#output ⇒ Object
Returns the value of attribute output.
8
9
10
|
# File 'lib/xcopier/transformer.rb', line 8
def output
@output
end
|
Instance Method Details
#on_error(error) ⇒ Object
27
28
29
30
|
# File 'lib/xcopier/transformer.rb', line 27
def on_error(error)
debug "Transformer#error: #{error.message}"
parent.tell([:error, error])
end
|
#on_message(message) ⇒ Object
16
17
18
19
20
21
22
23
24
25
|
# File 'lib/xcopier/transformer.rb', line 16
def on_message(message)
case message
in [:transform, Operation => operation]
debug "Transformer#message: type=transform operation=#{operation.inspect}"
process(operation)
else
debug "Transformer#message: type=unknown message=#{message.inspect}"
raise UnknownMessageError, "Unknown message: #{message.inspect}"
end
end
|
#process(operation) ⇒ Object
32
33
34
35
36
37
|
# File 'lib/xcopier/transformer.rb', line 32
def process(operation)
setup(operation)
work
ensure
teardown
end
|
#setup(operation) ⇒ Object
69
70
71
|
# File 'lib/xcopier/transformer.rb', line 69
def setup(operation)
@operation = operation
end
|
#teardown ⇒ Object
73
74
75
|
# File 'lib/xcopier/transformer.rb', line 73
def teardown
@operation = nil
end
|
55
56
57
58
59
|
# File 'lib/xcopier/transformer.rb', line 55
def transform(chunk)
chunk.map do |record|
transform_record(record)
end
end
|
61
62
63
64
65
66
67
|
# File 'lib/xcopier/transformer.rb', line 61
def transform_record(record)
record.each_with_object({}) do |(key, value), hash|
value = apply_overrides(value, key, record)
value = apply_anonymization(value, key)
hash[key] = value
end
end
|
#work ⇒ Object
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/xcopier/transformer.rb', line 39
def work
loop do
chunk = input.pop
if chunk == :done
debug "Transformer: done"
output.push(:done)
@operation = nil
break
end
debug "Transformer: transforming #{chunk.size} records"
transformed = transform(chunk)
output.push(transformed)
end
end
|