Class: Xcopier::Reader

Inherits:
Actor
  • Object
show all
Defined in:
lib/xcopier/reader.rb

Instance Attribute Summary collapse

Attributes inherited from Actor

#__queue, #copier, #parent, #result, #thread

Instance Method Summary collapse

Methods inherited from Actor

#__work__, spawn!, #tell, #terminate!, #wait

Constructor Details

#initialize(queue, *rest) ⇒ Reader

Returns a new instance of Reader.



9
10
11
12
# File 'lib/xcopier/reader.rb', line 9

def initialize(queue, *rest)
  @queue = queue
  super(*rest)
end

Instance Attribute Details

#operationObject (readonly)

Returns the value of attribute operation.



7
8
9
# File 'lib/xcopier/reader.rb', line 7

def operation
  @operation
end

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/xcopier/reader.rb', line 7

def queue
  @queue
end

Instance Method Details

#each_chunkObject



46
47
48
49
50
51
52
# File 'lib/xcopier/reader.rb', line 46

def each_chunk
  ApplicationRecord.connected_to(shard: :xcopier, role: :reading) do
    operation.scope.in_batches(of: operation.chunk_size) do |relation|
      yield operation.model.connection.exec_query(relation.to_sql).to_a
    end
  end
end

#on_error(error) ⇒ Object



25
26
27
28
# File 'lib/xcopier/reader.rb', line 25

def on_error(error)
  debug "Reader#error: #{error.message}"
  parent.tell([:error, error])
end

#on_message(message) ⇒ Object



14
15
16
17
18
19
20
21
22
23
# File 'lib/xcopier/reader.rb', line 14

def on_message(message)
  case message
  in [:read, Operation => operation]
    debug "Reader#message: type=read operation=#{operation.inspect}"
    process(operation)
  else
    debug "Reader#message: type=unknown message=#{message.inspect}"
    raise UnknownMessageError, "Unknown message: #{message.inspect}"
  end
end

#process(operation) ⇒ Object



30
31
32
33
34
35
# File 'lib/xcopier/reader.rb', line 30

def process(operation)
  setup(operation)
  work
ensure
  teardown
end

#setup(operation) ⇒ Object



54
55
56
# File 'lib/xcopier/reader.rb', line 54

def setup(operation)
  @operation = operation
end

#teardownObject



58
59
60
# File 'lib/xcopier/reader.rb', line 58

def teardown
  @operation = nil
end

#workObject



37
38
39
40
41
42
43
44
# File 'lib/xcopier/reader.rb', line 37

def work
  each_chunk do |chunk|
    queue.push(chunk)
    debug "Reader: read and pushed #{chunk.size} records"
  end
  debug "Reader: done"
  queue.push(:done)
end