Class: Xcopier::Reader
Instance Attribute Summary collapse
Attributes inherited from Actor
#__queue, #copier, #parent, #result, #thread
Instance Method Summary
collapse
Methods inherited from Actor
#__work__, spawn!, #tell, #terminate!, #wait
Constructor Details
#initialize(queue, *rest) ⇒ Reader
Returns a new instance of Reader.
9
10
11
12
|
# File 'lib/xcopier/reader.rb', line 9
def initialize(queue, *rest)
@queue = queue
super(*rest)
end
|
Instance Attribute Details
#operation ⇒ Object
Returns the value of attribute operation.
7
8
9
|
# File 'lib/xcopier/reader.rb', line 7
def operation
@operation
end
|
#queue ⇒ Object
Returns the value of attribute queue.
7
8
9
|
# File 'lib/xcopier/reader.rb', line 7
def queue
@queue
end
|
Instance Method Details
#each_chunk ⇒ Object
46
47
48
49
50
51
52
|
# File 'lib/xcopier/reader.rb', line 46
def each_chunk
ApplicationRecord.connected_to(shard: :xcopier, role: :reading) do
operation.scope.in_batches(of: operation.chunk_size) do |relation|
yield operation.model.connection.exec_query(relation.to_sql).to_a
end
end
end
|
#on_error(error) ⇒ Object
25
26
27
28
|
# File 'lib/xcopier/reader.rb', line 25
def on_error(error)
debug "Reader#error: #{error.message}"
parent.tell([:error, error])
end
|
#on_message(message) ⇒ Object
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/xcopier/reader.rb', line 14
def on_message(message)
case message
in [:read, Operation => operation]
debug "Reader#message: type=read operation=#{operation.inspect}"
process(operation)
else
debug "Reader#message: type=unknown message=#{message.inspect}"
raise UnknownMessageError, "Unknown message: #{message.inspect}"
end
end
|
#process(operation) ⇒ Object
30
31
32
33
34
35
|
# File 'lib/xcopier/reader.rb', line 30
def process(operation)
setup(operation)
work
ensure
teardown
end
|
#setup(operation) ⇒ Object
54
55
56
|
# File 'lib/xcopier/reader.rb', line 54
def setup(operation)
@operation = operation
end
|
#teardown ⇒ Object
58
59
60
|
# File 'lib/xcopier/reader.rb', line 58
def teardown
@operation = nil
end
|
#work ⇒ Object
37
38
39
40
41
42
43
44
|
# File 'lib/xcopier/reader.rb', line 37
def work
each_chunk do |chunk|
queue.push(chunk)
debug "Reader: read and pushed #{chunk.size} records"
end
debug "Reader: done"
queue.push(:done)
end
|