Class: Gitlab::Ci::Components::Usages::Aggregator

Inherits:
Object
  • Object
show all
Includes:
ExclusiveLeaseGuard
Defined in:
lib/gitlab/ci/components/usages/aggregator.rb

Overview

Component usage is defined as the number of unique ‘used_by_project_id`s in the table `p_catalog_resource_component_usages` for a given scope.

This aggregator is intended to be run in a scheduled cron job. It implements a “continue later” mechanism with a Redis cursor, which enables the work to continue from where it was last interrupted on each run. It iterates through the target table in batches, in order of ID ascending. For each target ID, it collects the usage count using ‘distinct_each_batch` for the given usage window. We collect the count in Rails because the SQL query `COUNT(DISTINCT(*))` is not performant when the data volume is large.

RUNTIME: The actual total runtime will be longer than MAX_RUNTIME because

it depends on the execution time of `&usage_counts_block`.

EXCLUSIVE LEASE: This aggregator is protected from parallel processing with an exclusive lease guard. WORKER: The worker running this service should be scheduled at the same cadence as MAX_RUNTIME, with:

deduplicate :until_executed, if_deduplicated: :reschedule_once, ttl: WORKER_DEDUP_TTL

STOPPING: When the aggregator’s cursor advances past the max target_id, it resets to 0. This means

it may reprocess targets that have already been processed for the given usage window.
To minimize redundant reprocessing, you should prevent the aggregator from running once it
meets a certain stop condition (e.g. when all targets have been marked as "processed").

Usage

each_batch:

- Yields each batch of `usage_counts` to the given block. The block should:
  - Be able to handle targets that might be reprocessed multiple times.
  - Not exceed 1 minute in execution time.
- `usage_counts` format: { target_object1 => 100, target_object2 => 200, ... }
- If the lease is obtained, returns a Result containing `total_targets_completed` and
  `cursor_attributes`. Otherwise, returns nil.

Example:

return if done_processing?

aggregator = Gitlab::Ci::Components::Usages::Aggregator.new(
  target_model: Ci::Catalog::Resource,
  group_by_column: :catalog_resource_id,
  usage_start_date: Date.today - 30.days,
  usage_end_date: Date.today - 1.day,
  lease_key: 'my_aggregator_service_lease_key'
)

result = aggregator.each_batch do |usage_counts|
  # Bulk update usage counts in the database
end

Parameters

target_model: Target model to iterate through. Model class should contain ‘include EachBatch`. group_by_column: This should be the usage table’s foreign key of the target_model. usage_start_date & usage_end_date: Date objects specifiying the window of usage data to aggregate. lease_key: Used for obtaining an exclusive lease. Also used as part of the cursor Redis key.

rubocop: disable CodeReuse/ActiveRecord – Custom queries required for data processing

Defined Under Namespace

Classes: Result

Constant Summary collapse

TARGET_BATCH_SIZE =
1000
DISTINCT_USAGE_BATCH_SIZE =
100
MAX_RUNTIME =

Should be >= job scheduling frequency so there is no gap between job runs

4.minutes
WORKER_DEDUP_TTL =

See gitlab.com/gitlab-org/gitlab/-/merge_requests/155001#note_1941066672 Includes extra time (1.minute) to execute ‘&usage_counts_block`

MAX_RUNTIME + 1.minute
LEASE_TIMEOUT =
10.minutes

Instance Method Summary collapse

Methods included from ExclusiveLeaseGuard

#exclusive_lease, #lease_key, #lease_release?, #lease_taken_log_level, #lease_taken_message, #log_lease_taken, #release_lease, #renew_lease!, #try_obtain_lease

Constructor Details

#initialize(target_model:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:) ⇒ Aggregator

Returns a new instance of Aggregator.



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/gitlab/ci/components/usages/aggregator.rb', line 74

def initialize(target_model:, group_by_column:, usage_start_date:, usage_end_date:, lease_key:)
  @target_model = target_model
  @group_by_column = group_by_column
  @lease_key = lease_key # Used by ExclusiveLeaseGuard
  @runtime_limiter = Gitlab::Metrics::RuntimeLimiter.new(MAX_RUNTIME)

  @cursor = Aggregators::Cursor.new(
    redis_key: "#{lease_key}:cursor",
    target_model: target_model,
    usage_window: Aggregators::Cursor::Window.new(usage_start_date, usage_end_date)
  )
end

Instance Method Details

#each_batch(&usage_counts_block) ⇒ Object



87
88
89
90
91
92
93
# File 'lib/gitlab/ci/components/usages/aggregator.rb', line 87

def each_batch(&usage_counts_block)
  try_obtain_lease do
    total_targets_completed = process_targets(&usage_counts_block)

    Result.new(total_targets_completed: total_targets_completed, cursor_attributes: cursor.attributes)
  end
end