Class: ElasticGraph::GraphQL::DatastoreSearchRouter

Inherits:
Object
  • Object
show all
Defined in:
lib/elastic_graph/graphql/datastore_search_router.rb

Overview

Responsible for routing datastore search requests to the appropriate cluster and index.

Instance Method Summary collapse

Constructor Details

#initialize(datastore_clients_by_name:, logger:, monotonic_clock:, config:) ⇒ DatastoreSearchRouter

Returns a new instance of DatastoreSearchRouter.



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/elastic_graph/graphql/datastore_search_router.rb', line 19

def initialize(
  datastore_clients_by_name:,
  logger:,
  monotonic_clock:,
  config:
)
  @datastore_clients_by_name = datastore_clients_by_name
  @logger = logger
  @monotonic_clock = monotonic_clock
  @config = config
end

Instance Method Details

#msearch(queries, query_tracker: QueryDetailsTracker.empty) ⇒ Object

Sends the datastore a multi-search request based on the given queries. Returns a hash of responses keyed by the query.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/elastic_graph/graphql/datastore_search_router.rb', line 33

def msearch(queries, query_tracker: QueryDetailsTracker.empty)
  DatastoreQuery.perform(queries) do |header_body_tuples_by_query|
    # Here we set a client-side timeout, which causes the client to give up and close the connection.
    # According to [1]--"We have a new way to cancel search requests efficiently from the client
    # in 7.4 (by closing the underlying http channel)"--this should cause the server to stop
    # executing the search, and more importantly, gives us a strictly enforced timeout.
    #
    # In addition, the datastore supports a `timeout` option on a search body, but this timeout is
    # "best effort", applies to each shard (and not to the overall search request), and only interrupts
    # certain kinds of operations. [2] and [3] below have more info.
    #
    # Note that I have not been able to observe this `timeout` on a search body ever working
    # as documented. In our test suite, none of the slow queries I have tried (both via
    # slow aggregation query and a slow script) have ever aborted early when that option is
    # set. In Kibana in production, @bsorbo observed it aborting a `search` request early
    # (but not necessarily an `msearch` request...), but even then, the response said `timed_out: false`!
    # Other people ([4]) have reported observing timeout having no effect on msearch requests.
    #
    # So, the client-side timeout is the main one we want here, and for now we are not using the
    # datastore search `timeout` option at all.
    #
    # For more info, see:
    #
    # [1] https://github.com/elastic/elasticsearch/issues/47716
    # [2] https://github.com/elastic/elasticsearch/pull/51858
    # [3] https://www.elastic.co/guide/en/elasticsearch/guide/current/_search_options.html#_timeout_2
    # [4] https://discuss.elastic.co/t/timeouts-ignored-in-multisearch/23673

    # Unfortunately, the Elasticsearch/OpenSearch clients don't support setting a per-request client-side timeout,
    # even though Faraday (the underlying HTTP client) does. To work around this, we pass our desired
    # timeout in a specific header that the `SupportTimeouts` Faraday middleware will use.
    headers = {TIMEOUT_MS_HEADER => msearch_request_timeout_from(queries)}.compact

    queries_and_header_body_tuples_by_datastore_client = header_body_tuples_by_query.group_by do |(query, header_body_tuples)|
      @datastore_clients_by_name.fetch(query.cluster_name)
    end

    datastore_query_started_at = @monotonic_clock.now_in_ms

    server_took_and_results = Support::Threading.parallel_map(queries_and_header_body_tuples_by_datastore_client) do |datastore_client, query_and_header_body_tuples_for_cluster|
      queries_for_cluster, header_body_tuples = query_and_header_body_tuples_for_cluster.transpose
      msearch_body = header_body_tuples.flatten(1)
      response = datastore_client.msearch(body: msearch_body, headers: headers)
      debug_query(query: msearch_body, response: response)
      ordered_responses = response.fetch("responses")
      [response["took"], queries_for_cluster.zip(ordered_responses)]
    end

    query_tracker.record_datastore_query_duration_ms(
      client: @monotonic_clock.now_in_ms - datastore_query_started_at,
      server: server_took_and_results.map(&:first).compact.max
    )

    server_took_and_results.flat_map(&:last).to_h.tap do |responses_by_query|
      log_shard_failure_if_necessary(responses_by_query)
      raise_search_failed_if_any_failures(responses_by_query)
    end
  end
end