Class: ElasticGraph::GraphQL::DatastoreSearchRouter
- Inherits:
-
Object
- Object
- ElasticGraph::GraphQL::DatastoreSearchRouter
- Defined in:
- lib/elastic_graph/graphql/datastore_search_router.rb
Overview
Responsible for routing datastore search requests to the appropriate cluster and index.
Instance Method Summary collapse
-
#initialize(datastore_clients_by_name:, logger:, monotonic_clock:, config:) ⇒ DatastoreSearchRouter
constructor
A new instance of DatastoreSearchRouter.
-
#msearch(queries, query_tracker: QueryDetailsTracker.empty) ⇒ Object
Sends the datastore a multi-search request based on the given queries.
Constructor Details
#initialize(datastore_clients_by_name:, logger:, monotonic_clock:, config:) ⇒ DatastoreSearchRouter
Returns a new instance of DatastoreSearchRouter.
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/elastic_graph/graphql/datastore_search_router.rb', line 19 def initialize( datastore_clients_by_name:, logger:, monotonic_clock:, config: ) @datastore_clients_by_name = datastore_clients_by_name @logger = logger @monotonic_clock = monotonic_clock @config = config end |
Instance Method Details
#msearch(queries, query_tracker: QueryDetailsTracker.empty) ⇒ Object
Sends the datastore a multi-search request based on the given queries. Returns a hash of responses keyed by the query.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/elastic_graph/graphql/datastore_search_router.rb', line 33 def msearch(queries, query_tracker: QueryDetailsTracker.empty) DatastoreQuery.perform(queries) do |header_body_tuples_by_query| # Here we set a client-side timeout, which causes the client to give up and close the connection. # According to [1]--"We have a new way to cancel search requests efficiently from the client # in 7.4 (by closing the underlying http channel)"--this should cause the server to stop # executing the search, and more importantly, gives us a strictly enforced timeout. # # In addition, the datastore supports a `timeout` option on a search body, but this timeout is # "best effort", applies to each shard (and not to the overall search request), and only interrupts # certain kinds of operations. [2] and [3] below have more info. # # Note that I have not been able to observe this `timeout` on a search body ever working # as documented. In our test suite, none of the slow queries I have tried (both via # slow aggregation query and a slow script) have ever aborted early when that option is # set. In Kibana in production, @bsorbo observed it aborting a `search` request early # (but not necessarily an `msearch` request...), but even then, the response said `timed_out: false`! # Other people ([4]) have reported observing timeout having no effect on msearch requests. # # So, the client-side timeout is the main one we want here, and for now we are not using the # datastore search `timeout` option at all. # # For more info, see: # # [1] https://github.com/elastic/elasticsearch/issues/47716 # [2] https://github.com/elastic/elasticsearch/pull/51858 # [3] https://www.elastic.co/guide/en/elasticsearch/guide/current/_search_options.html#_timeout_2 # [4] https://discuss.elastic.co/t/timeouts-ignored-in-multisearch/23673 # Unfortunately, the Elasticsearch/OpenSearch clients don't support setting a per-request client-side timeout, # even though Faraday (the underlying HTTP client) does. To work around this, we pass our desired # timeout in a specific header that the `SupportTimeouts` Faraday middleware will use. headers = {TIMEOUT_MS_HEADER => msearch_request_timeout_from(queries)}.compact queries_and_header_body_tuples_by_datastore_client = header_body_tuples_by_query.group_by do |(query, header_body_tuples)| @datastore_clients_by_name.fetch(query.cluster_name) end datastore_query_started_at = @monotonic_clock.now_in_ms server_took_and_results = Support::Threading.parallel_map(queries_and_header_body_tuples_by_datastore_client) do |datastore_client, query_and_header_body_tuples_for_cluster| queries_for_cluster, header_body_tuples = query_and_header_body_tuples_for_cluster.transpose msearch_body = header_body_tuples.flatten(1) response = datastore_client.msearch(body: msearch_body, headers: headers) debug_query(query: msearch_body, response: response) ordered_responses = response.fetch("responses") [response["took"], queries_for_cluster.zip(ordered_responses)] end query_tracker.record_datastore_query_duration_ms( client: @monotonic_clock.now_in_ms - datastore_query_started_at, server: server_took_and_results.map(&:first).compact.max ) server_took_and_results.flat_map(&:last).to_h.tap do |responses_by_query| log_shard_failure_if_necessary(responses_by_query) raise_search_failed_if_any_failures(responses_by_query) end end end |