Class: ConcurrentPipeline::Pipelines::Schema

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_pipeline/pipelines/schema.rb

Defined Under Namespace

Classes: Producer

Constant Summary collapse

PROCESSORS =
{
  sync: Processors::Synchronous,
  async: Processors::Asynchronous,
}

Instance Method Summary collapse

Instance Method Details

#build_processor(store) ⇒ Object



38
39
40
41
42
# File 'lib/concurrent_pipeline/pipelines/schema.rb', line 38

def build_processor(store)
  PROCESSORS
    .fetch(@processor.fetch(:type))
    .new(store:, producers:, **@processor.fetch(:attrs))
end

#process(query_or_record_name, **filters, &block) ⇒ Object



44
45
46
47
48
49
50
51
52
53
# File 'lib/concurrent_pipeline/pipelines/schema.rb', line 44

def process(query_or_record_name, **filters, &block)
  if query_or_record_name.is_a?(Proc)
    # Lambda-based query (current behavior)
    producers << Producer.new(query: query_or_record_name, block:)
  else
    # Record name with filters
    query = { record_name: query_or_record_name, filters: }
    producers << Producer.new(query:, block:)
  end
end

#processor(type, **attrs) ⇒ Object



34
35
36
# File 'lib/concurrent_pipeline/pipelines/schema.rb', line 34

def processor(type, **attrs)
  @processor = {type:, attrs:}
end

#producersObject



30
31
32
# File 'lib/concurrent_pipeline/pipelines/schema.rb', line 30

def producers
  @producers ||= []
end