Class: ElasticGraph::Indexer::DatastoreIndexingRouter
- Inherits:
-
Object
- Object
- ElasticGraph::Indexer::DatastoreIndexingRouter
- Defined in:
- lib/elastic_graph/indexer/datastore_indexing_router.rb
Overview
Responsible for routing datastore indexing requests to the appropriate cluster and index.
Defined Under Namespace
Classes: BulkResult
Constant Summary collapse
- MAPPING_CACHE_MAX_AGE_IN_MS_RANGE =
In this class, we internally cache the datastore mapping for an index definition, so that we don’t have to fetch the mapping from the datastore on each call to ‘bulk`. It rarely changes and ElasticGraph is designed so that mapping updates are applied before deploying the indexer with a new mapping.
However, if an engineer forgets to apply a mapping update before deploying, they’ll run into “mappings are incomplete” errors. They can updated the mapping to fix it, but the use of caching in this class could mean that the fix doesn’t necessarily work right away. The app would have to be deployed or restarted so that the caches are cleared. That could be annoying.
To address this issue, we’re adding an expiration on the caching of the index mappings. Re-fetching the index mapping once every few minutes is no big deal and will allow the indexer to recover on its own after a mapping update has been applied without requiring a deploy or a restart.
The expiration is a range so that, when we have many processes running, and they all started around the same time, (say, after a deploy!), they don’t all expire their caches in sync, leading to spiky load on the datastore. Instead, the random distribution of expiration times will spread out the load.
(5 * 60 * 1000)..(10 * 60 * 1000)
Instance Method Summary collapse
-
#bulk(operations, refresh: false) ⇒ Object
Proxies ‘client#bulk` by converting `operations` to their bulk form.
-
#initialize(datastore_clients_by_name:, mappings_by_index_def_name:, monotonic_clock:, logger:) ⇒ DatastoreIndexingRouter
constructor
A new instance of DatastoreIndexingRouter.
-
#source_event_versions_in_index(operations) ⇒ Object
Given a list of operations (which can contain different types of operations!), queries the datastore to identify the source event versions stored on the corresponding documents.
-
#validate_mapping_completeness_of!(index_cluster_name_method, *index_definitions) ⇒ Object
Queries the datastore mapping(s) for the given index definition(s) to verify that they are up-to-date with our schema artifacts, raising an error if the datastore mappings are missing fields that we expect.
Constructor Details
#initialize(datastore_clients_by_name:, mappings_by_index_def_name:, monotonic_clock:, logger:) ⇒ DatastoreIndexingRouter
Returns a new instance of DatastoreIndexingRouter.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/elastic_graph/indexer/datastore_indexing_router.rb', line 39 def initialize( datastore_clients_by_name:, mappings_by_index_def_name:, monotonic_clock:, logger: ) @datastore_clients_by_name = datastore_clients_by_name @logger = logger @monotonic_clock = monotonic_clock @cached_mappings = {} @mappings_by_index_def_name = mappings_by_index_def_name.transform_values do |mappings| DatastoreCore::IndexConfigNormalizer.normalize_mappings(mappings) end end |
Instance Method Details
#bulk(operations, refresh: false) ⇒ Object
Proxies ‘client#bulk` by converting `operations` to their bulk form. Returns a hash between a cluster and a list of successfully applied operations on that cluster.
For each operation, 1 of 4 things will happen, each of which will be treated differently:
1. The operation was successfully applied to the datastore and updated its state.
The operation will be included in the successful operation of the returned result.
2. The operation could not even be attempted. For example, an `Update` operation
cannot be attempted when the source event has `nil` for the field used as the source of
the destination type's id. The returned result will not include this operation.
3. The operation was a no-op due to the external version not increasing. This happens when we
process a duplicate or out-of-order event. The operation will be included in the returned
result's list of noop results.
4. The operation failed outright for some other reason. The operation will be included in the
returned result's failure results.
It is the caller’s responsibility to deal with any returned failures as this method does not raise an exception in that case.
Note: before any operations are attempted, the datastore indices are validated for consistency with the mappings we expect, meaning that no bulk operations will be attempted if that is not up-to-date.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/elastic_graph/indexer/datastore_indexing_router.rb', line 76 def bulk(operations, refresh: false) # Before writing these operations, verify their destination index mapping are consistent. validate_mapping_completeness_of!(:accessible_cluster_names_to_index_into, *operations.map(&:destination_index_def).uniq) # @type var ops_by_client: ::Hash[DatastoreCore::_Client, ::Array[_Operation]] ops_by_client = ::Hash.new { |h, k| h[k] = [] } # @type var unsupported_ops: ::Set[_Operation] unsupported_ops = ::Set.new operations.reject { |op| op.to_datastore_bulk.empty? }.each do |op| # Note: this intentionally does not use `accessible_cluster_names_to_index_into`. # We want to fail with clear error if any clusters are inaccessible instead of silently ignoring # the named cluster. The `IndexingFailuresError` provides a clear error. cluster_names = op.destination_index_def.clusters_to_index_into cluster_names.each do |cluster_name| if (client = @datastore_clients_by_name[cluster_name]) ops_by_client[client] << op else unsupported_ops << op end end unsupported_ops << op if cluster_names.empty? end unless unsupported_ops.empty? raise IndexingFailuresError, "The index definitions for #{unsupported_ops.size} operations " \ "(#{unsupported_ops.map { |o| Indexer::EventID.from_event(o.event) }.join(", ")}) " \ "were configured to be inaccessible. Check the configuration, or avoid sending " \ "events of this type to this ElasticGraph indexer." end ops_and_results_by_cluster = Support::Threading.parallel_map(ops_by_client) do |(client, ops)| responses = client.bulk(body: ops.flat_map(&:to_datastore_bulk), refresh: refresh).fetch("items") # As per https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body, # > `items` contains the result of each operation in the bulk request, in the order they were submitted. # Thus, we can trust it has the same cardinality as `ops` and they can be zipped together. ops_and_results = ops.zip(responses).map { |(op, response)| [op, op.categorize(response)] } [client.cluster_name, ops_and_results] end.to_h BulkResult.new(ops_and_results_by_cluster) end |
#source_event_versions_in_index(operations) ⇒ Object
Given a list of operations (which can contain different types of operations!), queries the datastore to identify the source event versions stored on the corresponding documents.
This was specifically designed to support dealing with malformed events. If an event is malformed we usually want to raise an exception, but if the document targeted by the malformed event is at a newer version in the index than the version number in the event, the malformed state of the event has already been superseded by a corrected event and we can just log a message instead. This method specifically supports that logic.
If the datastore returns errors for any of the calls, this method will raise an exception. Otherwise, this method returns a nested hash:
- The outer hash maps operations to an inner hash of results for that operation.
- The inner hash maps datastore cluster/client names to the version number for that operation from the datastore cluster.
Note that the returned ‘version` for an operation on a cluster can be `nil` (as when the document is not found, or for an operation type that doesn’t store source versions).
This nested structure is necessary because a single operation can target more than one datastore cluster, and a document may have different source event versions in different datastore clusters.
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/elastic_graph/indexer/datastore_indexing_router.rb', line 189 def source_event_versions_in_index(operations) ops_by_client_name = operations.each_with_object(::Hash.new { |h, k| h[k] = [] }) do |op, ops_hash| # Note: this intentionally does not use `accessible_cluster_names_to_index_into`. # We want to fail with clear error if any clusters are inaccessible instead of silently ignoring # the named cluster. The `IndexingFailuresError` provides a clear error. cluster_names = op.destination_index_def.clusters_to_index_into cluster_names.each { |cluster_name| ops_hash[cluster_name] << op } end client_names_and_results = Support::Threading.parallel_map(ops_by_client_name) do |(client_name, all_ops)| ops, unversioned_ops = all_ops.partition(&:versioned?) msearch_response = if (client = @datastore_clients_by_name[client_name]) && ops.any? body = ops.flat_map do |op| # We only care about the source versions, but the way we get it varies. include_version = if op.destination_index_def.use_updates_for_indexing? {_source: {includes: [ "__versions.#{op.update_target.relationship}", # The update_data script before ElasticGraph v0.8 used __sourceVersions[type] instead of __versions[relationship]. # To be backwards-compatible we need to fetch the data at both paths. # # TODO: Drop this when we no longer need to maintain backwards-compatibility. "__sourceVersions.#{op.event.fetch("type")}" ]}} else {version: true, _source: false} end [ # Note: we intentionally search the entire index expression, not just an individual index based on a rollover timestamp. # And we intentionally do NOT provide a routing value--we want to find the version, no matter what shard the document # lives on. # # Since this `source_event_versions_in_index` is for handling malformed events, its possible that the # rollover timestamp or routing value on the operation is wrong and that the correct document lives in # a different shard and index than what the operation is targeted at. We want to search across all of them # so that we will find it, regardless of where it lives. {index: op.destination_index_def.index_expression_for_search}, # Filter to the documents matching the id. {query: {ids: {values: [op.doc_id]}}}.merge(include_version) ] end client.msearch(body: body) else # The named client doesn't exist, so we don't have any versions for the docs. {"responses" => ops.map { |op| {"hits" => {"hits" => _ = []}} }} end errors = msearch_response.fetch("responses").select { |res| res["error"] } if errors.empty? versions_by_op = ops.zip(msearch_response.fetch("responses")).to_h do |(op, response)| hits = response.fetch("hits").fetch("hits") if hits.size > 1 # Got multiple results. The document is duplicated in multiple shards or indexes. Log a warning about this. @logger.warn({ "message_type" => "IdentifyDocumentVersionsGotMultipleResults", "index" => hits.map { |h| h["_index"] }, "routing" => hits.map { |h| h["_routing"] }, "id" => hits.map { |h| h["_id"] }, "version" => hits.map { |h| h["_version"] } }) end if op.destination_index_def.use_updates_for_indexing? versions = hits.filter_map do |hit| hit.dig("_source", "__versions", op.update_target.relationship, hit.fetch("_id")) || # The update_data script before ElasticGraph v0.8 used __sourceVersions[type] instead of __versions[relationship]. # To be backwards-compatible we need to fetch the data at both paths. # # TODO: Drop this when we no longer need to maintain backwards-compatibility. hit.dig("_source", "__sourceVersions", op.event.fetch("type"), hit.fetch("_id")) end [op, versions.uniq] else [op, hits.map { |h| h.fetch("_version") }.uniq] end end unversioned_ops_hash = unversioned_ops.to_h do |op| [op, []] # : [_Operation, ::Array[::Integer]] end [client_name, :success, versions_by_op.merge(unversioned_ops_hash)] else [client_name, :failure, errors] end end failures = client_names_and_results.flat_map do |(client_name, success_or_failure, results)| if success_or_failure == :success [] else results.map do |result| "From cluster #{client_name}: #{::JSON.generate(result, space: " ")}" end end end if failures.empty? client_names_and_results.each_with_object(_ = {}) do |(client_name, _success_or_failure, results), accum| results.each do |op, version| accum[op] ||= _ = {} accum[op][client_name] = version end end else raise IdentifyDocumentVersionsFailedError, "Got #{failures.size} failure(s) while querying the datastore " \ "for document versions:\n\n#{failures.join("\n")}" end end |
#validate_mapping_completeness_of!(index_cluster_name_method, *index_definitions) ⇒ Object
Queries the datastore mapping(s) for the given index definition(s) to verify that they are up-to-date with our schema artifacts, raising an error if the datastore mappings are missing fields that we expect. (Extra fields are allowed, though–we’ll just ignore them).
This is intended for use when you want a strong guarantee before proceeding that the indices are current, such as before indexing data, or after applying index updates (to “prove” that everything is how it should be).
This correctly queries the datastore clusters specified via ‘index_into_clusters` in config, but ignores clusters specified via `query_cluster` (since this isn’t intended to be used as part of the query flow).
For a rollover template, this takes care of verifying the template itself and also any indices that originated from the template.
Note also that this caches the datastore mappings, since this is intended to be used to verify an index before we index data into it, and we do not want to impose a huge performance penalty on that process (requiring multiple datastore requests before we index each document…). In general, the index mapping only changes when we make it change, and we deploy and restart ElasticGraph after any index mapping changes, so we do not need to worry about it mutating during the lifetime of a single process (particularly given the expense of doing so).
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/elastic_graph/indexer/datastore_indexing_router.rb', line 327 def validate_mapping_completeness_of!(index_cluster_name_method, *index_definitions) diffs_by_cluster_and_index_name = index_definitions.reduce(_ = {}) do |accum, index_def| accum.merge(mapping_diffs_for(index_def, index_cluster_name_method)) end if diffs_by_cluster_and_index_name.any? formatted_diffs = diffs_by_cluster_and_index_name.map do |(cluster_name, index_name), diff| <<~EOS On cluster `#{cluster_name}` and index/template `#{index_name}`: #{diff} EOS end.join("\n\n") raise ConfigError, "Datastore index mappings are incomplete compared to the current schema. " \ "The diff below uses the datastore index mapping as the base, and shows the expected mapping as a diff. " \ "\n\n#{formatted_diffs}" end end |