Module: Parallax::Collectable
- Included in:
- Collector
- Defined in:
- lib/parallax/collectable.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#all_workers_terminated? ⇒ Boolean
Checks if all workers have terminated.
-
#close ⇒ nil
Closes the collector and its data streams.
-
#close_worker(worker_index) ⇒ nil
Closes a worker.
-
#collect(message) ⇒ Object
Interprets a received message from the worker and executes a method in the collector.
-
#initialize_collector(workers_count) ⇒ Object
Inizializes the collectable object with its needed parameters.
-
#log(worker_index, message) ⇒ nil
Prints a message from the worker.
-
#receive ⇒ String
Reads data from the receiving stream.
-
#rescue(worker_index, error_class, error_message) ⇒ nil
In a worker raises an error, this is rescued and reraised in the collector.
-
#store(worker_index, object) ⇒ nil
Saves an object in the workers’ data.
Class Method Details
.included(base) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/parallax/collectable.rb', line 4 def self.included(base) # @return [Integer] the number of workers running in parallel. attr_accessor :workers_count # @return [Array] the result of all workers' store method calls. attr_accessor :workers_data # @return [Integer] the number of completed worker processes. attr_accessor :closed_workers_count # @return [IO::Pipe] the receiving stream of data. attr_accessor :receiving_stream # @return [IO::Pipe] the sending stream of data. attr_accessor :sending_stream ## # Inizializes the collectable object with its needed parameters. # # @param [Integer] workers_count the number of workers running in parallel. # # @return [Object] the instance of the including object. def initialize_collector(workers_count) @workers_count = workers_count @closed_workers_count = 0 @receiving_stream, @sending_stream = IO.pipe @workers_data = [] self end ## # Reads data from the receiving stream. # # @return [String] the received data. def receive self.collect @receiving_stream.gets.chomp.gsub("\t", "\n") end ## # Interprets a received message from the worker and # executes a method in the collector. # # @param [String] message the message as a string. # # @return [Object] the execution of the interpreted method. def collect() worker_index, method, *arguments = YAML.load() self.send method, worker_index, *arguments end ## # Prints a message from the worker. # # @param [Integer] worker_index the worker number. # @param [String] message the worker message. # # @return [nil] def log(worker_index, ) puts end ## # Saves an object in the workers' data. # # @param [Integer] worker_index the worker number. # @param [Object] object the object. # # @return [nil] def store(worker_index, object) workers_data.push [ Time.now, worker_index, object ] end ## # In a worker raises an error, this is rescued and reraised # in the collector. # # @param [Integer] worker_index the worker number. # @param [Class] error_class the class of the error. # @param [String] error_message the message of the error. # # @return [nil] def rescue(worker_index, error_class, ) raise error_class, "Worker #{worker_index} Error: #{error_message}" end ## # Closes a worker. # # @param [Integer] worker_index the worker number. # # @return [nil] def close_worker(worker_index) @closed_workers_count += 1 end ## # Closes the collector and its data streams. # # @return [nil] def close @receiving_stream.close @sending_stream.close end ## # Checks if all workers have terminated. # # return [Boolean] if all workers have terminated. def all_workers_terminated? @closed_workers_count >= @workers_count end end |
Instance Method Details
#all_workers_terminated? ⇒ Boolean
Checks if all workers have terminated.
return [Boolean] if all workers have terminated.
109 110 111 |
# File 'lib/parallax/collectable.rb', line 109 def all_workers_terminated? @closed_workers_count >= @workers_count end |
#close ⇒ nil
Closes the collector and its data streams.
100 101 102 103 |
# File 'lib/parallax/collectable.rb', line 100 def close @receiving_stream.close @sending_stream.close end |
#close_worker(worker_index) ⇒ nil
Closes a worker.
92 93 94 |
# File 'lib/parallax/collectable.rb', line 92 def close_worker(worker_index) @closed_workers_count += 1 end |
#collect(message) ⇒ Object
Interprets a received message from the worker and executes a method in the collector.
46 47 48 49 |
# File 'lib/parallax/collectable.rb', line 46 def collect() worker_index, method, *arguments = YAML.load() self.send method, worker_index, *arguments end |
#initialize_collector(workers_count) ⇒ Object
Inizializes the collectable object with its needed parameters.
23 24 25 26 27 28 29 |
# File 'lib/parallax/collectable.rb', line 23 def initialize_collector(workers_count) @workers_count = workers_count @closed_workers_count = 0 @receiving_stream, @sending_stream = IO.pipe @workers_data = [] self end |
#log(worker_index, message) ⇒ nil
Prints a message from the worker.
58 59 60 |
# File 'lib/parallax/collectable.rb', line 58 def log(worker_index, ) puts end |
#receive ⇒ String
Reads data from the receiving stream.
35 36 37 |
# File 'lib/parallax/collectable.rb', line 35 def receive self.collect @receiving_stream.gets.chomp.gsub("\t", "\n") end |
#rescue(worker_index, error_class, error_message) ⇒ nil
In a worker raises an error, this is rescued and reraised in the collector.
82 83 84 |
# File 'lib/parallax/collectable.rb', line 82 def rescue(worker_index, error_class, ) raise error_class, "Worker #{worker_index} Error: #{error_message}" end |
#store(worker_index, object) ⇒ nil
Saves an object in the workers’ data.
69 70 71 |
# File 'lib/parallax/collectable.rb', line 69 def store(worker_index, object) workers_data.push [ Time.now, worker_index, object ] end |