Class: Gitlab::Ci::Components::Usages::Aggregator
- Inherits:
-
Object
- Object
- Gitlab::Ci::Components::Usages::Aggregator
- Includes:
- ExclusiveLeaseGuard
- Defined in:
- lib/gitlab/ci/components/usages/aggregator.rb
Overview
Component usage is defined as the number of unique ‘used_by_project_id`s in the table `p_catalog_resource_component_usages` for a given scope.
This aggregator is intended to be run in a scheduled cron job. It implements a “continue later” mechanism with a Redis cursor, which enables the work to continue from where it was last interrupted on each run. It iterates through the target table in batches, in order of ID ascending. For each target ID, it collects the usage count using ‘distinct_each_batch` for the given usage window. We collect the count in Rails because the SQL query `COUNT(DISTINCT(*))` is not performant when the data volume is large.
RUNTIME: The actual total runtime will be longer than MAX_RUNTIME because
it depends on the execution time of `&usage_counts_block`.
EXCLUSIVE LEASE: This aggregator is protected from parallel processing with an exclusive lease guard. WORKER: The worker running this service should be scheduled at the same cadence as MAX_RUNTIME, with:
deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: WORKER_DEDUP_TTL
STOPPING: When the aggregator’s cursor advances past the max target_id, it resets to 0. This means
it may reprocess targets that have already been processed for the given usage window.
To minimize redundant reprocessing, you should prevent the aggregator from running once it
meets a certain stop condition (e.g. when all targets have been marked as "processed").
Usage
each_batch:
- Yields each batch of `usage_counts` to the given block. The block should:
- Be able to handle targets that might be reprocessed multiple times.
- Not exceed 1 minute in execution time.
- `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... }
- If the lease is obtained, returns a Result containing `total_targets_completed` and
`cursor_attributes`. Otherwise, returns nil.
Example:
return if done_processing?
aggregator = Gitlab::Ci::Components::Usages::Aggregator.new(
target_model: Ci::Catalog::Resource,
group_by_column: :catalog_resource_id,
usage_start_date: Date.today - 30.days,
usage_end_date: Date.today - 1.day,
lease_key: 'my_aggregator_service_lease_key'
)
result = aggregator.each_batch do |usage_counts|
# Bulk update usage counts in the database
end
Parameters
target_model: Target model to iterate through. Model class should contain ‘include EachBatch`. group_by_column: This should be the usage table’s foreign key of the target_model. usage_start_date & usage_end_date: Date objects specifiying the window of usage data to aggregate. lease_key: Used for obtaining an exclusive lease. Also used as part of the cursor Redis key.
rubocop: disable CodeReuse/ActiveRecord – Custom queries required for data processing
Defined Under Namespace
Classes: Result
Constant Summary collapse
- TARGET_BATCH_SIZE =
1000
- DISTINCT_USAGE_BATCH_SIZE =
100
- MAX_RUNTIME =
Should be >= job scheduling frequency so there is no gap between job runs
4.minutes
- WORKER_DEDUP_TTL =
See gitlab.com/gitlab-org/gitlab/-/merge_requests/155001#note_1941066672 Includes extra time (1.minute) to execute ‘&usage_counts_block`
MAX_RUNTIME + 1.minute
- LEASE_TIMEOUT =
10.minutes
Instance Method Summary collapse
- #each_batch(&usage_counts_block) ⇒ Object
-
#initialize(target_model:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) ⇒ Aggregator
constructor
A new instance of Aggregator.
Methods included from ExclusiveLeaseGuard
#exclusive_lease, #lease_key, #lease_release?, #lease_taken_log_level, #lease_taken_message, #log_lease_taken, #release_lease, #renew_lease!, #try_obtain_lease
Constructor Details
#initialize(target_model:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) ⇒ Aggregator
Returns a new instance of Aggregator.
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/gitlab/ci/components/usages/aggregator.rb', line 74 def initialize(target_model:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) @target_model = target_model @group_by_column = group_by_column @lease_key = lease_key # Used by ExclusiveLeaseGuard @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME) @cursor = Aggregators::Cursor.new( redis_key: "#{lease_key}:cursor", target_model: target_model, usage_window: Aggregators::Cursor::Window.new(usage_start_date, usage_end_date) ) end |
Instance Method Details
#each_batch(&usage_counts_block) ⇒ Object
87 88 89 90 91 92 93 |
# File 'lib/gitlab/ci/components/usages/aggregator.rb', line 87 def each_batch(&usage_counts_block) try_obtain_lease do total_targets_completed = process_targets(&usage_counts_block) Result.new(total_targets_completed: total_targets_completed, cursor_attributes: cursor.attributes) end end |