Class: Xcopier::Writer
Instance Attribute Summary collapse
Attributes inherited from Actor
#__queue, #copier, #parent, #result, #thread
Instance Method Summary
collapse
Methods inherited from Actor
#__work__, spawn!, #tell, #terminate!, #wait
Constructor Details
#initialize(queue, *rest) ⇒ Writer
Returns a new instance of Writer.
9
10
11
12
|
# File 'lib/xcopier/writer.rb', line 9
def initialize(queue, *rest)
@queue = queue
super(*rest)
end
|
Instance Attribute Details
#operation ⇒ Object
Returns the value of attribute operation.
7
8
9
|
# File 'lib/xcopier/writer.rb', line 7
def operation
@operation
end
|
#queue ⇒ Object
Returns the value of attribute queue.
7
8
9
|
# File 'lib/xcopier/writer.rb', line 7
def queue
@queue
end
|
Instance Method Details
#on_error(error) ⇒ Object
25
26
27
28
|
# File 'lib/xcopier/writer.rb', line 25
def on_error(error)
debug "Writer#error: #{error.message}"
parent.tell([:error, error])
end
|
#on_message(message) ⇒ Object
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/xcopier/writer.rb', line 14
def on_message(message)
case message
in [:write, Operation => operation]
debug "Writer#message: type=write operation=#{operation.inspect}"
process(operation)
else
debug "Writer#message: type=unknown message=#{message.inspect}"
raise UnknownMessageError, "Unknown message: #{message.inspect}"
end
end
|
#process(operation) ⇒ Object
30
31
32
33
34
35
|
# File 'lib/xcopier/writer.rb', line 30
def process(operation)
setup(operation)
work
ensure
teardown
end
|
#setup(operation) ⇒ Object
57
58
59
|
# File 'lib/xcopier/writer.rb', line 57
def setup(operation)
@operation = operation
end
|
#teardown ⇒ Object
61
62
63
|
# File 'lib/xcopier/writer.rb', line 61
def teardown
@operation = nil
end
|
#work ⇒ Object
37
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/xcopier/writer.rb', line 37
def work
loop do
chunk = queue.pop
if chunk == :done
debug "Writer: done"
@operation = nil
parent.tell(:done)
break
end
write(chunk)
end
end
|
#write(records) ⇒ Object
50
51
52
53
54
55
|
# File 'lib/xcopier/writer.rb', line 50
def write(records)
ApplicationRecord.connected_to(shard: :xcopier, role: :writing) do
debug "Writer: writing #{records.size} records"
operation.model.insert_all(records)
end
end
|