Class: Xcopier::Runner
Instance Attribute Summary collapse
-
#copier ⇒ Object
readonly
Returns the value of attribute copier.
-
#destination_queue ⇒ Object
readonly
Returns the value of attribute destination_queue.
-
#index ⇒ Object
Returns the value of attribute index.
-
#promise ⇒ Object
readonly
Returns the value of attribute promise.
-
#reader ⇒ Object
readonly
Returns the value of attribute reader.
-
#source_queue ⇒ Object
readonly
Returns the value of attribute source_queue.
-
#transformer ⇒ Object
readonly
Returns the value of attribute transformer.
-
#writer ⇒ Object
readonly
Returns the value of attribute writer.
Attributes inherited from Actor
#__queue, #parent, #result, #thread
Class Method Summary collapse
Instance Method Summary collapse
- #current_operation ⇒ Object
- #finish(message = nil) ⇒ Object
-
#initialize(copier) ⇒ Runner
constructor
A new instance of Runner.
- #on_error(error) ⇒ Object
- #on_message(message) ⇒ Object
- #process ⇒ Object
Methods inherited from Actor
#__work__, spawn!, #tell, #terminate!, #wait
Constructor Details
#initialize(copier) ⇒ Runner
Returns a new instance of Runner.
22 23 24 25 26 27 28 29 30 |
# File 'lib/xcopier/runner.rb', line 22 def initialize(copier) @source_queue = Queue.new @destination_queue = Queue.new @reader = Reader.spawn!(source_queue, copier).tap { |actor| actor.parent = self } @transformer = Transformer.spawn!(source_queue, destination_queue, copier).tap { |actor| actor.parent = self } @writer = Writer.spawn!(destination_queue, copier).tap { |actor| actor.parent = self } @index = -1 super end |
Instance Attribute Details
#copier ⇒ Object (readonly)
Returns the value of attribute copier.
10 11 12 |
# File 'lib/xcopier/runner.rb', line 10 def copier @copier end |
#destination_queue ⇒ Object (readonly)
Returns the value of attribute destination_queue.
10 11 12 |
# File 'lib/xcopier/runner.rb', line 10 def destination_queue @destination_queue end |
#index ⇒ Object
Returns the value of attribute index.
11 12 13 |
# File 'lib/xcopier/runner.rb', line 11 def index @index end |
#promise ⇒ Object (readonly)
Returns the value of attribute promise.
10 11 12 |
# File 'lib/xcopier/runner.rb', line 10 def promise @promise end |
#reader ⇒ Object (readonly)
Returns the value of attribute reader.
10 11 12 |
# File 'lib/xcopier/runner.rb', line 10 def reader @reader end |
#source_queue ⇒ Object (readonly)
Returns the value of attribute source_queue.
10 11 12 |
# File 'lib/xcopier/runner.rb', line 10 def source_queue @source_queue end |
#transformer ⇒ Object (readonly)
Returns the value of attribute transformer.
10 11 12 |
# File 'lib/xcopier/runner.rb', line 10 def transformer @transformer end |
#writer ⇒ Object (readonly)
Returns the value of attribute writer.
10 11 12 |
# File 'lib/xcopier/runner.rb', line 10 def writer @writer end |
Class Method Details
.run(copier) ⇒ Object
13 14 15 16 17 18 19 20 |
# File 'lib/xcopier/runner.rb', line 13 def self.run(copier) runner = spawn!(copier) runner.tell(:run) ret = runner.wait raise ret if ret.is_a?(Exception) ret end |
Instance Method Details
#current_operation ⇒ Object
77 78 79 |
# File 'lib/xcopier/runner.rb', line 77 def current_operation copier.operations[index] end |
#finish(message = nil) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/xcopier/runner.rb', line 64 def finish( = nil) debug "Runner#finish: message=#{.inspect}" self.result = source_queue.push(:done) destination_queue.push(:done) reader.terminate! transformer.terminate! writer.terminate! terminate! end |
#on_error(error) ⇒ Object
48 49 50 51 |
# File 'lib/xcopier/runner.rb', line 48 def on_error(error) debug "Runner#error: #{error.}" finish(error) end |
#on_message(message) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/xcopier/runner.rb', line 32 def () case in :run debug "Runner#message: type=run" process in :done process in [:error, e] debug "Runner#message: type=error error=#{e.}" finish(e) else debug "Runner#message: type=unknown message=#{.inspect}" raise UnknownMessageError, "Unknown message: #{.inspect}" end end |
#process ⇒ Object
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/xcopier/runner.rb', line 53 def process self.index += 1 if current_operation.nil? finish(true) else reader.tell([:read, current_operation]) transformer.tell([:transform, current_operation]) writer.tell([:write, current_operation]) end end |