Class: ConcurrentPipeline::Pipelines::Processors::Asynchronous

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_pipeline/pipelines/processors/asynchronous.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store:, producers:, concurrency: 5, enqueue_seconds: 0.1, before_process_hooks: [], timers: []) ⇒ Asynchronous

Returns a new instance of Asynchronous.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 22

def initialize(
  store:,
  producers:,
  concurrency: 5,
  enqueue_seconds: 0.1,
  before_process_hooks: [],
  timers: []
)
  @store = store
  @producers = producers
  @concurrency = concurrency
  @enqueue_seconds = enqueue_seconds
  @locker = Locker.new
  @errors = []
  @before_process_hooks = before_process_hooks
  @timers = timers
  @completed = 0
  @start_time = nil
end

Instance Attribute Details

#before_process_hooksObject (readonly)

Returns the value of attribute before_process_hooks.



12
13
14
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12

def before_process_hooks
  @before_process_hooks
end

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



12
13
14
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12

def concurrency
  @concurrency
end

#enqueue_secondsObject (readonly)

Returns the value of attribute enqueue_seconds.



12
13
14
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12

def enqueue_seconds
  @enqueue_seconds
end

#errorsObject (readonly)

Returns the value of attribute errors.



12
13
14
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12

def errors
  @errors
end

#lockerObject (readonly)

Returns the value of attribute locker.



12
13
14
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12

def locker
  @locker
end

#producersObject (readonly)

Returns the value of attribute producers.



12
13
14
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12

def producers
  @producers
end

#storeObject (readonly)

Returns the value of attribute store.



12
13
14
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12

def store
  @store
end

#timersObject (readonly)

Returns the value of attribute timers.



12
13
14
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12

def timers
  @timers
end

Class Method Details

.callObject



8
9
10
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 8

def self.call(...)
  new(...).call
end

Instance Method Details

#callObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 42

def call
  @start_time = Time.now
  Async { |task|
    semaphore = Async::Semaphore.new(concurrency)
    active_tasks = []

    # Start timer tasks
    timer_tasks = timers.map { |timer|
      task.async do
        loop do
          sleep(timer.interval)
          begin
            stats = build_stats
            timer.block.call(stats)
          rescue => e
            # Silently ignore timer errors to not break the pipeline
          end
        end
      end
    }

    begin
      loop do
        if errors.any?
          break
        end

        # Clean up finished tasks
        active_tasks.reject!(&:finished?)

        # Try to enqueue more work (only if no failure)
        enqueued_any = enqueue_all(semaphore, active_tasks, task)

        # Stop when nothing is being processed AND nothing new was enqueued
        break if active_tasks.empty? && !enqueued_any

        # Yield to allow other tasks to progress
        sleep(enqueue_seconds)
      end
    ensure
      # Stop all timer tasks
      timer_tasks.each(&:stop)
    end

    Result.new(errors:)
  }.wait  # Wait for the async block to complete and return its value
end

#queue_sizeObject



90
91
92
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 90

def queue_size
  locker.locks.size
end