Class: Xcopier::Runner

Inherits:
Actor
  • Object
show all
Defined in:
lib/xcopier/runner.rb

Instance Attribute Summary collapse

Attributes inherited from Actor

#__queue, #parent, #result, #thread

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Actor

#__work__, spawn!, #tell, #terminate!, #wait

Constructor Details

#initialize(copier) ⇒ Runner

Returns a new instance of Runner.



22
23
24
25
26
27
28
29
30
# File 'lib/xcopier/runner.rb', line 22

def initialize(copier)
  @source_queue = Queue.new
  @destination_queue = Queue.new
  @reader = Reader.spawn!(source_queue, copier).tap { |actor| actor.parent = self }
  @transformer = Transformer.spawn!(source_queue, destination_queue, copier).tap { |actor| actor.parent = self }
  @writer = Writer.spawn!(destination_queue, copier).tap { |actor| actor.parent = self }
  @index = -1
  super
end

Instance Attribute Details

#copierObject (readonly)

Returns the value of attribute copier.



10
11
12
# File 'lib/xcopier/runner.rb', line 10

def copier
  @copier
end

#destination_queueObject (readonly)

Returns the value of attribute destination_queue.



10
11
12
# File 'lib/xcopier/runner.rb', line 10

def destination_queue
  @destination_queue
end

#indexObject

Returns the value of attribute index.



11
12
13
# File 'lib/xcopier/runner.rb', line 11

def index
  @index
end

#promiseObject (readonly)

Returns the value of attribute promise.



10
11
12
# File 'lib/xcopier/runner.rb', line 10

def promise
  @promise
end

#readerObject (readonly)

Returns the value of attribute reader.



10
11
12
# File 'lib/xcopier/runner.rb', line 10

def reader
  @reader
end

#source_queueObject (readonly)

Returns the value of attribute source_queue.



10
11
12
# File 'lib/xcopier/runner.rb', line 10

def source_queue
  @source_queue
end

#transformerObject (readonly)

Returns the value of attribute transformer.



10
11
12
# File 'lib/xcopier/runner.rb', line 10

def transformer
  @transformer
end

#writerObject (readonly)

Returns the value of attribute writer.



10
11
12
# File 'lib/xcopier/runner.rb', line 10

def writer
  @writer
end

Class Method Details

.run(copier) ⇒ Object



13
14
15
16
17
18
19
20
# File 'lib/xcopier/runner.rb', line 13

def self.run(copier)
  runner = spawn!(copier)
  runner.tell(:run)
  ret = runner.wait
  raise ret if ret.is_a?(Exception)

  ret
end

Instance Method Details

#current_operationObject



77
78
79
# File 'lib/xcopier/runner.rb', line 77

def current_operation
  copier.operations[index]
end

#finish(message = nil) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/xcopier/runner.rb', line 64

def finish(message = nil)
  debug "Runner#finish: message=#{message.inspect}"
  self.result = message

  source_queue.push(:done)
  destination_queue.push(:done)

  reader.terminate!
  transformer.terminate!
  writer.terminate!
  terminate!
end

#on_error(error) ⇒ Object



48
49
50
51
# File 'lib/xcopier/runner.rb', line 48

def on_error(error)
  debug "Runner#error: #{error.message}"
  finish(error)
end

#on_message(message) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/xcopier/runner.rb', line 32

def on_message(message)
  case message
  in :run
    debug "Runner#message: type=run"
    process
  in :done
    process
  in [:error, e]
    debug "Runner#message: type=error error=#{e.message}"
    finish(e)
  else
    debug "Runner#message: type=unknown message=#{message.inspect}"
    raise UnknownMessageError, "Unknown message: #{message.inspect}"
  end
end

#processObject



53
54
55
56
57
58
59
60
61
62
# File 'lib/xcopier/runner.rb', line 53

def process
  self.index += 1
  if current_operation.nil?
    finish(true)
  else
    reader.tell([:read, current_operation])
    transformer.tell([:transform, current_operation])
    writer.tell([:write, current_operation])
  end
end