Class: ConcurrentPipeline::Pipelines::Processors::Synchronous
- Inherits:
-
Object
- Object
- ConcurrentPipeline::Pipelines::Processors::Synchronous
- Defined in:
- lib/concurrent_pipeline/pipelines/processors/synchronous.rb
Instance Attribute Summary collapse
-
#before_process_hooks ⇒ Object
readonly
Returns the value of attribute before_process_hooks.
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#locker ⇒ Object
readonly
Returns the value of attribute locker.
-
#producers ⇒ Object
readonly
Returns the value of attribute producers.
-
#store ⇒ Object
readonly
Returns the value of attribute store.
-
#timers ⇒ Object
readonly
Returns the value of attribute timers.
Class Method Summary collapse
Instance Method Summary collapse
- #call ⇒ Object
-
#initialize(store:, producers:, before_process_hooks: [], timers: []) ⇒ Synchronous
constructor
A new instance of Synchronous.
- #queue_size ⇒ Object
Constructor Details
#initialize(store:, producers:, before_process_hooks: [], timers: []) ⇒ Synchronous
Returns a new instance of Synchronous.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 19 def initialize( store:, producers:, before_process_hooks: [], timers: [] ) @store = store @producers = producers @locker = Locker.new @errors = [] @before_process_hooks = before_process_hooks @timers = timers @completed = 0 @start_time = nil end |
Instance Attribute Details
#before_process_hooks ⇒ Object (readonly)
Returns the value of attribute before_process_hooks.
11 12 13 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11 def before_process_hooks @before_process_hooks end |
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
11 12 13 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11 def errors @errors end |
#locker ⇒ Object (readonly)
Returns the value of attribute locker.
11 12 13 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11 def locker @locker end |
#producers ⇒ Object (readonly)
Returns the value of attribute producers.
11 12 13 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11 def producers @producers end |
#store ⇒ Object (readonly)
Returns the value of attribute store.
11 12 13 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11 def store @store end |
#timers ⇒ Object (readonly)
Returns the value of attribute timers.
11 12 13 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 11 def timers @timers end |
Class Method Details
.call ⇒ Object
7 8 9 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 7 def self.call(...) new(...).call end |
Instance Method Details
#call ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 35 def call @start_time = Time.now Async { |task| # Start timer tasks timer_tasks = timers.map { |timer| task.async do loop do sleep(timer.interval) begin stats = build_stats timer.block.call(stats) rescue => e # Silently ignore timer errors to not break the pipeline end end end } begin while(enqueue_all) do end Result.new(errors:) ensure timer_tasks.each(&:stop) end }.wait end |
#queue_size ⇒ Object
62 63 64 65 66 67 68 69 70 |
# File 'lib/concurrent_pipeline/pipelines/processors/synchronous.rb', line 62 def queue_size # Cannot use locker.locks because don't enqueue them all, just # process records one-by-one. producers.sum do |producer| producer.records(store).count do |record| !locker.locked?(producer:, record:) end end end |