Class: ConcurrentPipeline::Pipelines::Processors::Synchronous

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_pipeline/pipelines/processors/synchronous.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store:, producers:, before_process_hooks: [], timers: []) ⇒ Synchronous

Returns a new instance of Synchronous.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 19

def initialize(
  store:,
  producers:,
  before_process_hooks: [],
  timers: []
)
  @store = store
  @producers = producers
  @locker = Locker.new
  @errors = []
  @before_process_hooks = before_process_hooks
  @timers = timers
  @completed = 0
  @start_time = nil
end

Instance Attribute Details

#before_process_hooksObject (readonly)

Returns the value of attribute before_process_hooks.



11
12
13
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11

def before_process_hooks
  @before_process_hooks
end

#errorsObject (readonly)

Returns the value of attribute errors.



11
12
13
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11

def errors
  @errors
end

#lockerObject (readonly)

Returns the value of attribute locker.



11
12
13
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11

def locker
  @locker
end

#producersObject (readonly)

Returns the value of attribute producers.



11
12
13
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11

def producers
  @producers
end

#storeObject (readonly)

Returns the value of attribute store.



11
12
13
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11

def store
  @store
end

#timersObject (readonly)

Returns the value of attribute timers.



11
12
13
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11

def timers
  @timers
end

Class Method Details

.callObject



7
8
9
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 7

def self.call(...)
  new(...).call
end

Instance Method Details

#callObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 35

def call
  @start_time = Time.now
  Async { |task|
    # Start timer tasks
    timer_tasks = timers.map { |timer|
      task.async do
        loop do
          sleep(timer.interval)
          begin
            stats = build_stats
            timer.block.call(stats)
          rescue => e
            # Silently ignore timer errors to not break the pipeline
          end
        end
      end
    }

    begin
      while(enqueue_all) do end
      Result.new(errors:)
    ensure
      timer_tasks.each(&:stop)
    end
  }.wait
end

#queue_sizeObject



62
63
64
65
66
67
68
69
70
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 62

def queue_size
  # Cannot use locker.locks because don't enqueue them all, just
  # process records one-by-one.
  producers.sum do |producer|
    producer.records(store).count do |record|
      !locker.locked?(producer:, record:)
    end
  end
end