Class: ConcurrentPipeline::Pipelines::Processors::Asynchronous
- Inherits:
-
Object
- Object
- ConcurrentPipeline::Pipelines::Processors::Asynchronous
- Defined in:
- lib/concurrent_pipeline/pipelines/processors/asynchronous.rb
Instance Attribute Summary collapse
-
#before_process_hooks ⇒ Object
readonly
Returns the value of attribute before_process_hooks.
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
-
#enqueue_seconds ⇒ Object
readonly
Returns the value of attribute enqueue_seconds.
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#locker ⇒ Object
readonly
Returns the value of attribute locker.
-
#producers ⇒ Object
readonly
Returns the value of attribute producers.
-
#store ⇒ Object
readonly
Returns the value of attribute store.
-
#timers ⇒ Object
readonly
Returns the value of attribute timers.
Class Method Summary collapse
Instance Method Summary collapse
- #call ⇒ Object
-
#initialize(store:, producers:, concurrency: 5, enqueue_seconds: 0.1, before_process_hooks: [], timers: []) ⇒ Asynchronous
constructor
A new instance of Asynchronous.
- #queue_size ⇒ Object
Constructor Details
#initialize(store:, producers:, concurrency: 5, enqueue_seconds: 0.1, before_process_hooks: [], timers: []) ⇒ Asynchronous
Returns a new instance of Asynchronous.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 22 def initialize( store:, producers:, concurrency: 5, enqueue_seconds: 0.1, before_process_hooks: [], timers: [] ) @store = store @producers = producers @concurrency = concurrency @enqueue_seconds = enqueue_seconds @locker = Locker.new @errors = [] @before_process_hooks = before_process_hooks @timers = timers @completed = 0 @start_time = nil end |
Instance Attribute Details
#before_process_hooks ⇒ Object (readonly)
Returns the value of attribute before_process_hooks.
12 13 14 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12 def before_process_hooks @before_process_hooks end |
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
12 13 14 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12 def concurrency @concurrency end |
#enqueue_seconds ⇒ Object (readonly)
Returns the value of attribute enqueue_seconds.
12 13 14 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12 def enqueue_seconds @enqueue_seconds end |
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
12 13 14 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12 def errors @errors end |
#locker ⇒ Object (readonly)
Returns the value of attribute locker.
12 13 14 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12 def locker @locker end |
#producers ⇒ Object (readonly)
Returns the value of attribute producers.
12 13 14 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12 def producers @producers end |
#store ⇒ Object (readonly)
Returns the value of attribute store.
12 13 14 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12 def store @store end |
#timers ⇒ Object (readonly)
Returns the value of attribute timers.
12 13 14 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 12 def timers @timers end |
Class Method Details
.call ⇒ Object
8 9 10 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 8 def self.call(...) new(...).call end |
Instance Method Details
#call ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 42 def call @start_time = Time.now Async { |task| semaphore = Async::Semaphore.new(concurrency) active_tasks = [] # Start timer tasks timer_tasks = timers.map { |timer| task.async do loop do sleep(timer.interval) begin stats = build_stats timer.block.call(stats) rescue => e # Silently ignore timer errors to not break the pipeline end end end } begin loop do if errors.any? break end # Clean up finished tasks active_tasks.reject!(&:finished?) # Try to enqueue more work (only if no failure) enqueued_any = enqueue_all(semaphore, active_tasks, task) # Stop when nothing is being processed AND nothing new was enqueued break if active_tasks.empty? && !enqueued_any # Yield to allow other tasks to progress sleep(enqueue_seconds) end ensure # Stop all timer tasks timer_tasks.each(&:stop) end Result.new(errors:) }.wait # Wait for the async block to complete and return its value end |
#queue_size ⇒ Object
90 91 92 |
# File 'lib/concurrent_pipeline/pipelines/processors/asynchronous.rb', line 90 def queue_size locker.locks.size end |