Module: ElasticGraph::Indexer::Operation
- Defined in:
- lib/elastic_graph/indexer/operation/result.rb,
lib/elastic_graph/indexer/operation/update.rb,
lib/elastic_graph/indexer/operation/upsert.rb,
lib/elastic_graph/indexer/operation/factory.rb,
lib/elastic_graph/indexer/operation/count_accumulator.rb
Defined Under Namespace
Constant Summary collapse
- Result =
Describes the result of an operation. :category value will be one of: [:success, :noop, :failure]
::Data.define(:category, :operation, :description) do # @implements Result def self.success_of(operation) Result.new( category: :success, operation: operation, description: nil ) end def self.noop_of(operation, description) Result.new( category: :noop, operation: operation, description: description ) end def self.failure_of(operation, description) Result.new( category: :failure, operation: operation, description: description ) end def operation_type operation.type end def event operation.event end def event_id EventID.from_event(event) end def summary # :nocov: -- `description == nil` case is not covered; not simple to test. suffix = description ? "--#{description}" : nil # :nocov: "<#{operation.description} #{event_id} #{category}#{suffix}>" end def inspect parts = [ self.class.name, operation_type.inspect, category.inspect, event_id, description ].compact "#<#{parts.join(" ")}>" end alias_method :to_s, :inspect end
- Upsert =
Support::MemoizableData.define(:event, :destination_index_def, :record_preparer) do # @implements Upsert def to_datastore_bulk @to_datastore_bulk ||= [{index: }, prepared_record] end def categorize(response) index = response.fetch("index") status = index.fetch("status") case status when 200..299 Result.success_of(self) when 409 Result.noop_of(self, index.fetch("error").fetch("reason")) else Result.failure_of(self, index.fetch("error").fetch("reason")) end end def doc_id @doc_id ||= event.fetch("id") end def type :upsert end def description "#{event.fetch("type")} upsert" end def versioned? true end private def @metadata ||= { _index: destination_index_def.index_name_for_writes(prepared_record), _id: doc_id, version: event.fetch("version"), version_type: "external", routing: destination_index_def.routing_value_for_prepared_record(prepared_record) }.compact end def prepared_record @prepared_record ||= record_preparer.prepare_for_index(event.fetch("type"), event.fetch("record")) end end
- CountAccumulator =
Responsible for maintaining state and accumulating list counts while we traverse the ‘data` we are preparing to update in the index. Much of the complexity here is due to the fact that we have 3 kinds of list fields: scalar lists, embedded object lists, and `nested` object lists.
The Elasticsearch/OpenSearch ‘nested` type indexes objects of this type as separate hidden documents. As a result, each `nested` object type gets its own `__counts` field. In contrast, embedded object lists get flattened into separate entries (one per field path) in a flat map (with `dot_separated_path: values_at_path` entries) at the document root.
We mirror this structure with our ‘__counts`: each document (either a root document, or a hidden `nested` document) gets its own `__counts` field, so we essentially have multiple “count parents”. Each `__counts` field is a map, keyed by field paths, and containing the number of list elements at that field path after the flattening has occurred.
The index mapping defines where the ‘__counts` fields go. This abstraction uses the mapping to determine when it needs to create a new “count parent”.
Note: instances of this class are “shallow immutable” (none of the attributes of an instance can be reassigned) but the ‘counts` attribute is itself a mutable hash–we use it to accumulate the list counts as we traverse the structure.
[^1]: www.elastic.co/guide/en/elasticsearch/reference/8.9/nested.html
::Data.define( # Hash containing the counts we have accumulated so far. This hash gets mutated as we accumulate, # and multiple accumulator instances share the same hash instance. However, a new `counts` hash will # be created when we reach a new parent. :counts, # String describing our current location in the traversed structure relative to the current parent. # This gets replaced on new accumulator instances as we traverse the data structure. :path_from_parent, # String describing our current location in the traversed structure relative to the overall document root. # This gets replaced on new accumulator instances as we traverse the data structure. :path_from_root, # The index mapping at the current level of the structure when this accumulator instance was created. # As we traverse new levels of the data structure, new `CountAccumulator` instances will be created with # the `mapping` updated to reflect the new level of the structure we are at. :mapping, # Set of field paths to subfields of `LIST_COUNTS_FIELD` for the current source relationship. # This will be used to determine which subfields of the `LIST_COUNTS_FIELD` are populated. :list_counts_field_paths_for_source, # Indicates if our current path is underneath a list; if so, `maybe_increment` will increment when called. :has_list_ancestor ) do # @implements CountAccumulator def self.merge_list_counts_into(params, mapping:, list_counts_field_paths_for_source:) # Here we compute the counts of our list elements so that we can index it. data = compute_list_counts_of(params.fetch("data"), CountAccumulator.new_parent( # We merge in `type: nested` since the `nested` type indicates a new count accumulator parent and we want that applied at the root. mapping.merge("type" => "nested"), list_counts_field_paths_for_source )) # The root `__counts` field needs special handling due to our `sourced_from` feature. Anything in `data` # will overwrite what's in the specified fields when the script executes, but since there could be list # fields from multiple sources, we need `__counts` to get merged properly. So here we "promote" it from # `data.__counts` to being a root-level parameter. params.merge( "data" => data.except(LIST_COUNTS_FIELD), LIST_COUNTS_FIELD => data[LIST_COUNTS_FIELD] ) end def self.compute_list_counts_of(value, parent_accumulator) case value when nil value when ::Hash parent_accumulator.maybe_increment parent_accumulator.process_hash(value) do |key, subvalue, accumulator| [key, compute_list_counts_of(subvalue, accumulator[key])] end when ::Array parent_accumulator.process_list(value) do |element, accumulator| compute_list_counts_of(element, accumulator) end else parent_accumulator.maybe_increment value end end # Creates an initially empty accumulator instance for a new parent (either at the overall document # root are at the root of a `nested` object). def self.new_parent(mapping, list_counts_field_paths_for_source, path_from_root: nil) count_field_prefix = path_from_root ? "#{path_from_root}.#{LIST_COUNTS_FIELD}." : "#{LIST_COUNTS_FIELD}." initial_counts = (mapping.dig("properties", LIST_COUNTS_FIELD, "properties") || {}).filter_map do |field, _| [field, 0] if list_counts_field_paths_for_source.include?(count_field_prefix + field) end.to_h new(initial_counts, nil, path_from_root, mapping, list_counts_field_paths_for_source, false) end # Processes the given hash, beginning a new parent if need. A new parent is needed if the # current mapping has a `__counts` field. # # Yields repeatedly (once per hash entry). We yield the entry key/value, and an accumulator # instance (either the current `self` or a new parent). # # Afterwards, merges the resulting `__counts` into the hash before it's returned, as needed. def process_hash(hash) mapping_type = mapping["type"] # As we traverse through the JSON object structure, we also have to traverse through the # condenseed mapping. Doing this requires that the `properties` of the index mapping # match the fields of the JSON data structure. However, Elasticsearch/OpenSearch have a number of field # types which can be represented as a JSON object in an indexing call, but which have no # `properties` in the mapping. We can't successfully traverse through the JSON data and the # mapping when we encounter these field types (since the mapping has no record of the # subfields) so we must treat these types as a special case; we can't proceed, and we won't # have any lists to count, anyway. return hash if DATASTORE_PROPERTYLESS_OBJECT_TYPES.include?(mapping_type) # THe `nested` type indicates a new document level, so if it's not `nested`, we should process the hash without making a new parent. return hash.to_h { |key, value| yield key, value, self } unless mapping_type == "nested" # ...but otherwise, we should make a new parent. new_parent = CountAccumulator.new_parent(mapping, list_counts_field_paths_for_source, path_from_root: path_from_root) updated_hash = hash.to_h { |key, value| yield key, value, new_parent } # If we have a LIST_COUNTS_FIELD at this level of our mapping, we should merge in the counts hash from the new parent. if mapping.dig("properties", LIST_COUNTS_FIELD) updated_hash.merge(LIST_COUNTS_FIELD => new_parent.counts) else updated_hash end end # Processes the given list, tracking the fact that subpaths have a list ancestor. def process_list(list) child_accumulator = with(has_list_ancestor: true) list.map { |value| yield value, child_accumulator } end # Increments the count at the current `path_from_parent` in the current parent's counts hash if we are under a list. def maybe_increment return unless has_list_ancestor key = path_from_parent.to_s counts[key] = counts.fetch(key) + 1 end # Creates a "child" accumulator at the given subpath. Should be used as we traverse the data structure. def [](subpath) with( path_from_parent: path_from_parent ? "#{path_from_parent}#{LIST_COUNTS_FIELD_PATH_KEY_SEPARATOR}#{subpath}" : subpath, path_from_root: path_from_root ? "#{path_from_root}.#{subpath}" : subpath, mapping: mapping.fetch("properties").fetch(subpath) ) end end