Class: ElasticGraph::Indexer::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/elastic_graph/indexer/processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(datastore_router:, operation_factory:, logger:, indexing_latency_slo_thresholds_by_timestamp_in_ms:, clock: ::Time) ⇒ Processor

Returns a new instance of Processor.



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/elastic_graph/indexer/processor.rb', line 17

def initialize(
  datastore_router:,
  operation_factory:,
  logger:,
  indexing_latency_slo_thresholds_by_timestamp_in_ms:,
  clock: ::Time
)
  @datastore_router = datastore_router
  @operation_factory = operation_factory
  @clock = clock
  @logger = logger
  @indexing_latency_slo_thresholds_by_timestamp_in_ms = indexing_latency_slo_thresholds_by_timestamp_in_ms
end

Instance Method Details

#process(events, refresh_indices: false) ⇒ Object

Processes the given events, writing them to the datastore. If any events are invalid, an exception will be raised indicating why the events were invalid, but the valid events will still be written to the datastore. No attempt is made to provide atomic “all or nothing” behavior.



35
36
37
38
39
# File 'lib/elastic_graph/indexer/processor.rb', line 35

def process(events, refresh_indices: false)
  failures = process_returning_failures(events, refresh_indices: refresh_indices)
  return if failures.empty?
  raise IndexingFailuresError.for(failures: failures, events: events)
end

#process_returning_failures(events, refresh_indices: false) ⇒ Object

Like ‘process`, but returns failures instead of raising an exception. The caller is responsible for handling the failures.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/elastic_graph/indexer/processor.rb', line 43

def process_returning_failures(events, refresh_indices: false)
  factory_results_by_event = events.to_h { |event| [event, @operation_factory.build(event)] }

  factory_results = factory_results_by_event.values

  bulk_result = @datastore_router.bulk(factory_results.flat_map(&:operations), refresh: refresh_indices)
  successful_operations = bulk_result.successful_operations(check_failures: false)

  calculate_latency_metrics(successful_operations, bulk_result.noop_results)

  all_failures =
    factory_results.map(&:failed_event_error).compact +
    bulk_result.failure_results.map do |result|
      all_operations_for_event = factory_results_by_event.fetch(result.event).operations
      FailedEventError.from_failed_operation_result(result, all_operations_for_event.to_set)
    end

  categorize_failures(all_failures, events)
end