Class: ParallelizeSneaqlTransforms

Inherits:
Object
  • Object
show all
Defined in:
lib/sneaql_standard_lib/parallelize.rb

Overview

used to run concurrent sneaql transforms from a threadsafe queue.

Instance Method Summary collapse

Constructor Details

#initialize(queue_to_process, concurrency, logger = nil) ⇒ ParallelizeSneaqlTransforms

initialize object and run concurrent transforms.

Parameters:

  • queue_to_process (Queue)

    queue of hashes with all params needed for transform

  • concurrency (Fixnum)

    number of threads

  • logger (Logger) (defaults to: nil)

    optional logger object



10
11
12
13
14
15
# File 'lib/sneaql_standard_lib/parallelize.rb', line 10

def initialize(queue_to_process, concurrency, logger = nil)
  @logger = logger ? logger : Logger.new(STDOUT)
  @queue_to_process = queue_to_process
  @concurrency = concurrency
  parallelize
end

Instance Method Details

#parallelizeObject

performs the actual parallel execution



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/sneaql_standard_lib/parallelize.rb', line 18

def parallelize
  @logger.info(
    "processing #{@queue_to_process} with a concurrency of #{@concurrency}..."
  )

  threads = []
  @concurrency.times do
    threads << Thread.new do
      # loop until there are no more things to do
      until @queue_to_process.empty?
        begin
          object_to_process = @queue_to_process.pop(true) rescue nil
          # logger.debug(object_to_process)
          t = Sneaql::Transform.new(
            object_to_process,
            @logger
          )
          t.run
        rescue => e
          @logger.error(e.message)
          e.backtrace.each { |b| @logger.error(b) }
        ensure
          @logger.info("finished processing #{object_to_process['transform_name']}")
        end
      end
    end
  end
  threads.each { |t| t.join }
  threads = nil
end