Class: Deepsearch::Engine::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/deepsearch/engine/pipeline.rb

Overview

Orchestrates the entire multi-step search and summarization process. The pipeline executes a sequence of steps:

  1. Prepares sub-queries from the initial query.

  2. Performs parallel searches to gather website links.

  3. Aggregates and parses content from the found websites.

  4. Uses RAG to find text chunks relevant to the query.

  5. Summarizes the relevant chunks into a final answer.

It includes retry logic for each step to enhance robustness.

Instance Method Summary collapse

Constructor Details

#initialize(search_adapter) ⇒ Pipeline

Returns a new instance of Pipeline.



20
21
22
# File 'lib/deepsearch/engine/pipeline.rb', line 20

def initialize(search_adapter)
  @search_adapter = search_adapter
end

Instance Method Details

#execute(query, **options) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/deepsearch/engine/pipeline.rb', line 24

def execute(query, **options)
  query_preprocessing_result = with_retry do
    Steps::PrepareSubqueries::Process.new(query).execute
  end
  notify_listener(:step_completed, step: :prepare_subqueries, result: query_preprocessing_result)
  # [query_preprocessing_result] Contains:
  #   - cleaned_query [String] The sanitized version of original query
  #   - original_query [String] The unmodified input query
  #   - sub_queries [Array<String>] Generated subqueries (empty array on error)
  #   - error [String, nil] Error message if processing failed

  parallel_search_options = {
    initial_query: query_preprocessing_result.cleaned_query,
    sub_queries: query_preprocessing_result.sub_queries,
    search_adapter: @search_adapter,
    **options
  }

  parallel_search_result = with_retry { Steps::ParallelSearch::Process.new(**parallel_search_options).execute }
  notify_listener(:step_completed, step: :parallel_search, result: parallel_search_result)
  # [parallel_search_result] Contains:
  #   - websites [Array<ParallelSearch::Result>] Search results
  #     - ParallelSearch::Result objects with:
  #       - websites [Array<Hash#url>] Array of website URLs
  #   - success [Boolean] Whether search succeeded
  #   - error [String, nil] Error message if search failed

  data_aggregation_result = with_retry do
    Steps::DataAggregation::Process.new(
      websites: parallel_search_result.websites
    ).execute
  end
  notify_listener(:step_completed, step: :data_aggregation, result: data_aggregation_result)
  # [data_aggregation_result] Contains:
  #   - parsed_websites [Array<DataAggregation::Result>]
  #     - DataAggregation::Result objects with:
  #       - url [String] Website URL
  #       - content [String] Parsed content from the website
  #   - success [Boolean] Whether search succeeded
  #   - error [String, nil] Error message if search failed

  rag_result = with_retry do
    Steps::Rag::Process.new(
      query: query_preprocessing_result.cleaned_query,
      parsed_websites: data_aggregation_result.parsed_websites
    ).execute
  end
  notify_listener(:step_completed, step: :rag, result: rag_result)
  # [rag_result] Contains:
  #   - query [::Deepsearch::Engine::Steps::Rag::Values::Query]
  #   - relevant_chunks [Array<::Deepsearch::Engine::Steps::Rag::Values::Chunk>]
  summarization_result = with_retry do
    Steps::Summarization::Process.new(
      query: rag_result.query,
      relevant_chunks: rag_result.relevant_chunks
    ).execute
  end
  notify_listener(:step_completed, step: :summarization, result: summarization_result)
  # [summarization_result] Contains:
  #   - summary [String] The final answer with citations
  #   - success [Boolean]
  #   - error [String, nil]

  summarization_result
end