Class: Gitlab::Counters::BufferedCounter
- Inherits:
-
Object
- Object
- Gitlab::Counters::BufferedCounter
- Includes:
- ExclusiveLeaseHelpers
- Defined in:
- lib/gitlab/counters/buffered_counter.rb
Constant Summary collapse
- WORKER_DELAY =
10.minutes
- WORKER_LOCK_TTL =
10.minutes
- REFRESH_KEYS_TTL =
Refresh keys are set to expire after a very long time, so that they do not occupy Redis memory indefinitely, if for any reason they are not deleted. In practice, a refresh is not expected to take longer than this TTL.
14.days
- CLEANUP_BATCH_SIZE =
50- CLEANUP_INTERVAL_SECONDS =
0.1- MAX_BITMAP_OFFSET =
Limit size of bitmap key to 2^26-1 (~8MB)
67108863- LUA_FLUSH_INCREMENT_SCRIPT =
<<~LUA local increment_key, flushed_key = KEYS[1], KEYS[2] local increment_value = redis.call("get", increment_key) or 0 local flushed_value = redis.call("incrby", flushed_key, increment_value) if flushed_value == 0 then redis.call("del", increment_key, flushed_key) else redis.call("del", increment_key) end return flushed_value LUA
- LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT =
<<~LUA local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3] local tracking_shard_key, opposing_tracking_shard_key, shards_key = KEYS[4], KEYS[5], KEYS[6] local amount, tracking_offset = tonumber(ARGV[1]), tonumber(ARGV[2]) -- increment to the counter key when not refreshing if redis.call("exists", refresh_indicator_key) == 0 then return redis.call("incrby", counter_key, amount) end -- deduplicate and increment to the refresh counter key while refreshing local found_duplicate = redis.call("getbit", tracking_shard_key, tracking_offset) if found_duplicate == 1 then return redis.call("get", refresh_key) end redis.call("setbit", tracking_shard_key, tracking_offset, 1) redis.call("expire", tracking_shard_key, #{REFRESH_KEYS_TTL.seconds}) redis.call("sadd", shards_key, tracking_shard_key) redis.call("expire", shards_key, #{REFRESH_KEYS_TTL.seconds}) local found_opposing_change = redis.call("getbit", opposing_tracking_shard_key, tracking_offset) local increment_without_previous_decrement = amount > 0 and found_opposing_change == 0 local decrement_with_previous_increment = amount < 0 and found_opposing_change == 1 local net_change = 0 if increment_without_previous_decrement or decrement_with_previous_increment then net_change = amount end return redis.call("incrby", refresh_key, net_change) LUA
- LUA_INITIATE_REFRESH_SCRIPT =
<<~LUA local counter_key, refresh_indicator_key = KEYS[1], KEYS[2] redis.call("del", counter_key) redis.call("set", refresh_indicator_key, 1, "ex", #{REFRESH_KEYS_TTL.seconds}) LUA
- LUA_FINALIZE_REFRESH_SCRIPT =
<<~LUA local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3] local refresh_amount = redis.call("get", refresh_key) or 0 redis.call("incrby", counter_key, refresh_amount) redis.call("del", refresh_indicator_key, refresh_key) LUA
Constants included from ExclusiveLeaseHelpers
ExclusiveLeaseHelpers::FailedToObtainLockError
Instance Method Summary collapse
-
#amount_to_be_flushed ⇒ Object
amount_to_be_flushed returns the total value to be flushed.
- #bulk_increment(increments) ⇒ Object
- #cleanup_refresh ⇒ Object
- #commit_increment! ⇒ Object
- #finalize_refresh ⇒ Object
- #flushed_key ⇒ Object
- #get ⇒ Object
- #increment(increment) ⇒ Object
-
#initialize(counter_record, attribute) ⇒ BufferedCounter
constructor
A new instance of BufferedCounter.
- #initiate_refresh! ⇒ Object
- #key ⇒ Object
- #refresh_indicator_key ⇒ Object
- #refresh_key ⇒ Object
Methods included from ExclusiveLeaseHelpers
Constructor Details
#initialize(counter_record, attribute) ⇒ BufferedCounter
Returns a new instance of BufferedCounter.
34 35 36 37 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 34 def initialize(counter_record, attribute) @counter_record = counter_record @attribute = attribute end |
Instance Method Details
#amount_to_be_flushed ⇒ Object
amount_to_be_flushed returns the total value to be flushed. The total value is the sum of the following:
-
current value in the increment_key
-
any existing value in the flushed_key that has not been flushed
169 170 171 172 173 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 169 def amount_to_be_flushed redis_state do |redis| redis.eval(LUA_FLUSH_INCREMENT_SCRIPT, keys: [key, flushed_key]) end end |
#bulk_increment(increments) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 90 def bulk_increment(increments) result = redis_state do |redis| redis.pipelined do |pipeline| increments.each do |increment| pipeline.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)) end end end FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute.to_s) result.last.to_i end |
#cleanup_refresh ⇒ Object
137 138 139 140 141 142 143 144 145 146 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 137 def cleanup_refresh redis_state do |redis| while (shards = redis.spop(shards_key, CLEANUP_BATCH_SIZE)) redis.del(*shards) break if shards.size < CLEANUP_BATCH_SIZE sleep CLEANUP_INTERVAL_SECONDS end end end |
#commit_increment! ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 148 def commit_increment! with_exclusive_lease do flush_amount = amount_to_be_flushed next if flush_amount == 0 # We need the transaction so we can rollback the counter update if `remove_flushed_key` fails. counter_record.transaction do counter_record.update_counters({ attribute => flush_amount }) remove_flushed_key end counter_record.execute_after_commit_callbacks end counter_record.reset.read_attribute(attribute) end |
#finalize_refresh ⇒ Object
127 128 129 130 131 132 133 134 135 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 127 def finalize_refresh redis_state do |redis| redis.eval(LUA_FINALIZE_REFRESH_SCRIPT, keys: [key, refresh_key, refresh_indicator_key]) end FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute.to_s) ::Counters::CleanupRefreshWorker.perform_async(counter_record.class.name, counter_record.id, attribute) end |
#flushed_key ⇒ Object
182 183 184 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 182 def flushed_key "#{key}:flushed" end |
#get ⇒ Object
39 40 41 42 43 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 39 def get redis_state do |redis| redis.get(key).to_i end end |
#increment(increment) ⇒ Object
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 79 def increment(increment) result = redis_state do |redis| redis.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)).to_i end FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id, attribute.to_s) result end |
#initiate_refresh! ⇒ Object
111 112 113 114 115 116 117 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 111 def initiate_refresh! counter_record.update!(attribute => 0) redis_state do |redis| redis.eval(LUA_INITIATE_REFRESH_SCRIPT, keys: [key, refresh_indicator_key]) end end |
#key ⇒ Object
175 176 177 178 179 180 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 175 def key record_name = counter_record.class record_id = counter_record.id "#{counter_record.counters_key_prefix}:counters:#{record_name}:#{record_id}:#{attribute}" end |
#refresh_indicator_key ⇒ Object
186 187 188 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 186 def refresh_indicator_key "#{key}:refresh-in-progress" end |
#refresh_key ⇒ Object
190 191 192 |
# File 'lib/gitlab/counters/buffered_counter.rb', line 190 def refresh_key "#{key}:refresh" end |