Class: Xcopier::Writer

Inherits:
Actor
  • Object
show all
Defined in:
lib/xcopier/writer.rb

Instance Attribute Summary collapse

Attributes inherited from Actor

#__queue, #copier, #parent, #result, #thread

Instance Method Summary collapse

Methods inherited from Actor

#__work__, spawn!, #tell, #terminate!, #wait

Constructor Details

#initialize(queue, *rest) ⇒ Writer

Returns a new instance of Writer.



9
10
11
12
# File 'lib/xcopier/writer.rb', line 9

def initialize(queue, *rest)
  @queue = queue
  super(*rest)
end

Instance Attribute Details

#operationObject (readonly)

Returns the value of attribute operation.



7
8
9
# File 'lib/xcopier/writer.rb', line 7

def operation
  @operation
end

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/xcopier/writer.rb', line 7

def queue
  @queue
end

Instance Method Details

#on_error(error) ⇒ Object



25
26
27
28
# File 'lib/xcopier/writer.rb', line 25

def on_error(error)
  debug "Writer#error: #{error.message}"
  parent.tell([:error, error])
end

#on_message(message) ⇒ Object



14
15
16
17
18
19
20
21
22
23
# File 'lib/xcopier/writer.rb', line 14

def on_message(message)
  case message
  in [:write, Operation => operation]
    debug "Writer#message: type=write operation=#{operation.inspect}"
    process(operation)
  else
    debug "Writer#message: type=unknown message=#{message.inspect}"
    raise UnknownMessageError, "Unknown message: #{message.inspect}"
  end
end

#process(operation) ⇒ Object



30
31
32
33
34
35
# File 'lib/xcopier/writer.rb', line 30

def process(operation)
  setup(operation)
  work
ensure
  teardown
end

#setup(operation) ⇒ Object



57
58
59
# File 'lib/xcopier/writer.rb', line 57

def setup(operation)
  @operation = operation
end

#teardownObject



61
62
63
# File 'lib/xcopier/writer.rb', line 61

def teardown
  @operation = nil
end

#workObject



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/xcopier/writer.rb', line 37

def work
  loop do
    chunk = queue.pop
    if chunk == :done
      debug "Writer: done"
      @operation = nil
      parent.tell(:done)
      break
    end
    write(chunk)
  end
end

#write(records) ⇒ Object



50
51
52
53
54
55
# File 'lib/xcopier/writer.rb', line 50

def write(records)
  ApplicationRecord.connected_to(shard: :xcopier, role: :writing) do
    debug "Writer: writing #{records.size} records"
    operation.model.insert_all(records)
  end
end