Class: ElasticGraph::Indexer::Processor
- Inherits:
-
Object
- Object
- ElasticGraph::Indexer::Processor
- Defined in:
- lib/elastic_graph/indexer/processor.rb
Instance Method Summary collapse
-
#initialize(datastore_router:, operation_factory:, logger:, indexing_latency_slo_thresholds_by_timestamp_in_ms:, clock: ::Time) ⇒ Processor
constructor
A new instance of Processor.
-
#process(events, refresh_indices: false) ⇒ Object
Processes the given events, writing them to the datastore.
-
#process_returning_failures(events, refresh_indices: false) ⇒ Object
Like ‘process`, but returns failures instead of raising an exception.
Constructor Details
#initialize(datastore_router:, operation_factory:, logger:, indexing_latency_slo_thresholds_by_timestamp_in_ms:, clock: ::Time) ⇒ Processor
Returns a new instance of Processor.
17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/elastic_graph/indexer/processor.rb', line 17 def initialize( datastore_router:, operation_factory:, logger:, indexing_latency_slo_thresholds_by_timestamp_in_ms:, clock: ::Time ) @datastore_router = datastore_router @operation_factory = operation_factory @clock = clock @logger = logger @indexing_latency_slo_thresholds_by_timestamp_in_ms = end |
Instance Method Details
#process(events, refresh_indices: false) ⇒ Object
Processes the given events, writing them to the datastore. If any events are invalid, an exception will be raised indicating why the events were invalid, but the valid events will still be written to the datastore. No attempt is made to provide atomic “all or nothing” behavior.
35 36 37 38 39 |
# File 'lib/elastic_graph/indexer/processor.rb', line 35 def process(events, refresh_indices: false) failures = process_returning_failures(events, refresh_indices: refresh_indices) return if failures.empty? raise IndexingFailuresError.for(failures: failures, events: events) end |
#process_returning_failures(events, refresh_indices: false) ⇒ Object
Like ‘process`, but returns failures instead of raising an exception. The caller is responsible for handling the failures.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/elastic_graph/indexer/processor.rb', line 43 def process_returning_failures(events, refresh_indices: false) factory_results_by_event = events.to_h { |event| [event, @operation_factory.build(event)] } factory_results = factory_results_by_event.values bulk_result = @datastore_router.bulk(factory_results.flat_map(&:operations), refresh: refresh_indices) successful_operations = bulk_result.successful_operations(check_failures: false) calculate_latency_metrics(successful_operations, bulk_result.noop_results) all_failures = factory_results.map(&:failed_event_error).compact + bulk_result.failure_results.map do |result| all_operations_for_event = factory_results_by_event.fetch(result.event).operations FailedEventError.from_failed_operation_result(result, all_operations_for_event.to_set) end categorize_failures(all_failures, events) end |