Class: ConcurrentPipeline::Pipelines::Schema
- Inherits:
-
Object
- Object
- ConcurrentPipeline::Pipelines::Schema
- Defined in:
- lib/concurrent_pipeline/pipelines/schema.rb
Defined Under Namespace
Classes: Producer
Constant Summary collapse
- PROCESSORS =
{ sync: Processors::Synchronous, async: Processors::Asynchronous, }
Instance Method Summary collapse
- #build_processor(store) ⇒ Object
- #process(query_or_record_name, **filters, &block) ⇒ Object
- #processor(type, **attrs) ⇒ Object
- #producers ⇒ Object
Instance Method Details
#build_processor(store) ⇒ Object
38 39 40 41 42 |
# File 'lib/concurrent_pipeline/pipelines/schema.rb', line 38 def build_processor(store) PROCESSORS .fetch(@processor.fetch(:type)) .new(store:, producers:, **@processor.fetch(:attrs)) end |
#process(query_or_record_name, **filters, &block) ⇒ Object
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/concurrent_pipeline/pipelines/schema.rb', line 44 def process(query_or_record_name, **filters, &block) if query_or_record_name.is_a?(Proc) # Lambda-based query (current behavior) producers << Producer.new(query: query_or_record_name, block:) else # Record name with filters query = { record_name: query_or_record_name, filters: } producers << Producer.new(query:, block:) end end |
#processor(type, **attrs) ⇒ Object
34 35 36 |
# File 'lib/concurrent_pipeline/pipelines/schema.rb', line 34 def processor(type, **attrs) @processor = {type:, attrs:} end |
#producers ⇒ Object
30 31 32 |
# File 'lib/concurrent_pipeline/pipelines/schema.rb', line 30 def producers @producers ||= [] end |