Class: Deepsearch::Engine::Pipeline
- Inherits:
-
Object
- Object
- Deepsearch::Engine::Pipeline
- Defined in:
- lib/deepsearch/engine/pipeline.rb
Overview
Orchestrates the entire multi-step search and summarization process. The pipeline executes a sequence of steps:
-
Prepares sub-queries from the initial query.
-
Performs parallel searches to gather website links.
-
Aggregates and parses content from the found websites.
-
Uses RAG to find text chunks relevant to the query.
-
Summarizes the relevant chunks into a final answer.
It includes retry logic for each step to enhance robustness.
Instance Method Summary collapse
- #execute(query, **options) ⇒ Object
-
#initialize(search_adapter) ⇒ Pipeline
constructor
A new instance of Pipeline.
Constructor Details
#initialize(search_adapter) ⇒ Pipeline
Returns a new instance of Pipeline.
20 21 22 |
# File 'lib/deepsearch/engine/pipeline.rb', line 20 def initialize(search_adapter) @search_adapter = search_adapter end |
Instance Method Details
#execute(query, **options) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/deepsearch/engine/pipeline.rb', line 24 def execute(query, **) query_preprocessing_result = with_retry do Steps::PrepareSubqueries::Process.new(query).execute end notify_listener(:step_completed, step: :prepare_subqueries, result: query_preprocessing_result) # [query_preprocessing_result] Contains: # - cleaned_query [String] The sanitized version of original query # - original_query [String] The unmodified input query # - sub_queries [Array<String>] Generated subqueries (empty array on error) # - error [String, nil] Error message if processing failed = { initial_query: query_preprocessing_result.cleaned_query, sub_queries: query_preprocessing_result.sub_queries, search_adapter: @search_adapter, ** } parallel_search_result = with_retry { Steps::ParallelSearch::Process.new(**).execute } notify_listener(:step_completed, step: :parallel_search, result: parallel_search_result) # [parallel_search_result] Contains: # - websites [Array<ParallelSearch::Result>] Search results # - ParallelSearch::Result objects with: # - websites [Array<Hash#url>] Array of website URLs # - success [Boolean] Whether search succeeded # - error [String, nil] Error message if search failed data_aggregation_result = with_retry do Steps::DataAggregation::Process.new( websites: parallel_search_result.websites ).execute end notify_listener(:step_completed, step: :data_aggregation, result: data_aggregation_result) # [data_aggregation_result] Contains: # - parsed_websites [Array<DataAggregation::Result>] # - DataAggregation::Result objects with: # - url [String] Website URL # - content [String] Parsed content from the website # - success [Boolean] Whether search succeeded # - error [String, nil] Error message if search failed rag_result = with_retry do Steps::Rag::Process.new( query: query_preprocessing_result.cleaned_query, parsed_websites: data_aggregation_result.parsed_websites ).execute end notify_listener(:step_completed, step: :rag, result: rag_result) # [rag_result] Contains: # - query [::Deepsearch::Engine::Steps::Rag::Values::Query] # - relevant_chunks [Array<::Deepsearch::Engine::Steps::Rag::Values::Chunk>] summarization_result = with_retry do Steps::Summarization::Process.new( query: rag_result.query, relevant_chunks: rag_result.relevant_chunks ).execute end notify_listener(:step_completed, step: :summarization, result: summarization_result) # [summarization_result] Contains: # - summary [String] The final answer with citations # - success [Boolean] # - error [String, nil] summarization_result end |