Class: PCSV

Inherits:
Object
  • Object
show all
Defined in:
lib/pcsv.rb,
lib/pcsv/version.rb

Constant Summary collapse

VERSION =
"0.1.0"

Class Method Summary collapse

Class Method Details

.each(path, options = {}) ⇒ Object

Opens a CSV file and runs the block on each cell in parallel. Returns a copy of the CSV file.



13
14
15
# File 'lib/pcsv.rb', line 13

def self.each(path, options={})
  return process(:each, path, options, &Proc.new)
end

.map(path, options = {}) ⇒ Object

Opens a CSV file and maps the results of a block on each cell in parallel. Returns a copy of the CSV file.



19
20
21
# File 'lib/pcsv.rb', line 19

def self.map(path, options={})
  return process(:map, path, options, &Proc.new)
end

.process(action, path, options = {}) ⇒ Object

Performs a given action on each cell of a CSV file.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/pcsv.rb', line 24

def self.process(action, path, options={})
  thread_count = options.delete(:thread_count) || 10

  # Open CSV & build a worker queue.
  csv = CSV.read(path, options)
  queue = []
  csv.each_with_index do |row, row_index|
    row.fields.each_with_index do |field, col_index|
      queue << {
        row_index:row_index,
        col_index:col_index,
        row:row,
        value:field
      }
    end
  end
  
  # Launch threads and iterate over queue until it's done.
  mutex = Mutex.new()
  threads = []
  thread_count.times do |thread_index|
    threads << Thread.new() do
      loop do
        # Grab an item from the front of the queue.
        item = nil
        mutex.synchronize do
          item = queue.shift()
        end
        break if item.nil?
      
        # Invoke the block with the row info.
        begin
          result = yield item, mutex
        
          if action == :map
            mutex.synchronize {
              item[:row][item[:col_index]] = result
            }
          end

        rescue StandardError => e
          warn("[ERROR] #{e.message} [R#{item[:row_index]},C#{item[:col_index]}]")
        end
      end
    end
  end

  threads.each { |t| t.join }
  
  return csv
end