Class: Gitlab::Counters::BufferedCounter

Inherits:
Object
  • Object
show all
Includes:
ExclusiveLeaseHelpers
Defined in:
lib/gitlab/counters/buffered_counter.rb

Constant Summary collapse

WORKER_DELAY =
10.minutes
WORKER_LOCK_TTL =
10.minutes
REFRESH_KEYS_TTL =

Refresh keys are set to expire after a very long time, so that they do not occupy Redis memory indefinitely, if for any reason they are not deleted. In practice, a refresh is not expected to take longer than this TTL.

14.days
CLEANUP_BATCH_SIZE =
50
CLEANUP_INTERVAL_SECONDS =
0.1
MAX_BITMAP_OFFSET =

Limit size of bitmap key to 2^26-1 (~8MB)

67108863
LUA_FLUSH_INCREMENT_SCRIPT =
"local increment_key, flushed_key = KEYS[1], KEYS[2]\nlocal increment_value = redis.call(\"get\", increment_key) or 0\nlocal flushed_value = redis.call(\"incrby\", flushed_key, increment_value)\nif flushed_value == 0 then\n  redis.call(\"del\", increment_key, flushed_key)\nelse\n  redis.call(\"del\", increment_key)\nend\nreturn flushed_value\n"
LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT =
"local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]\nlocal tracking_shard_key, opposing_tracking_shard_key, shards_key = KEYS[4], KEYS[5], KEYS[6]\n\nlocal amount, tracking_offset = tonumber(ARGV[1]), tonumber(ARGV[2])\n\n-- increment to the counter key when not refreshing\nif redis.call(\"exists\", refresh_indicator_key) == 0 then\n  return redis.call(\"incrby\", counter_key, amount)\nend\n\n-- deduplicate and increment to the refresh counter key while refreshing\nlocal found_duplicate = redis.call(\"getbit\", tracking_shard_key, tracking_offset)\nif found_duplicate == 1 then\n  return redis.call(\"get\", refresh_key)\nend\n\nredis.call(\"setbit\", tracking_shard_key, tracking_offset, 1)\nredis.call(\"expire\", tracking_shard_key, \#{REFRESH_KEYS_TTL.seconds})\nredis.call(\"sadd\", shards_key, tracking_shard_key)\nredis.call(\"expire\", shards_key, \#{REFRESH_KEYS_TTL.seconds})\n\nlocal found_opposing_change = redis.call(\"getbit\", opposing_tracking_shard_key, tracking_offset)\nlocal increment_without_previous_decrement = amount > 0 and found_opposing_change == 0\nlocal decrement_with_previous_increment = amount < 0 and found_opposing_change == 1\nlocal net_change = 0\n\nif increment_without_previous_decrement or decrement_with_previous_increment then\n  net_change = amount\nend\n\nreturn redis.call(\"incrby\", refresh_key, net_change)\n"
LUA_INITIATE_REFRESH_SCRIPT =
"local counter_key, refresh_indicator_key = KEYS[1], KEYS[2]\nredis.call(\"del\", counter_key)\nredis.call(\"set\", refresh_indicator_key, 1, \"ex\", \#{REFRESH_KEYS_TTL.seconds})\n"
LUA_FINALIZE_REFRESH_SCRIPT =
"local counter_key, refresh_key, refresh_indicator_key = KEYS[1], KEYS[2], KEYS[3]\nlocal refresh_amount = redis.call(\"get\", refresh_key) or 0\n\nredis.call(\"incrby\", counter_key, refresh_amount)\nredis.call(\"del\", refresh_indicator_key, refresh_key)\n"

Constants included from ExclusiveLeaseHelpers

ExclusiveLeaseHelpers::FailedToObtainLockError

Instance Method Summary collapse

Methods included from ExclusiveLeaseHelpers

#in_lock

Constructor Details

#initialize(counter_record, attribute) ⇒ BufferedCounter

Returns a new instance of BufferedCounter.



34
35
36
37
# File 'lib/gitlab/counters/buffered_counter.rb', line 34

def initialize(counter_record, attribute)
  @counter_record = counter_record
  @attribute = attribute
end

Instance Method Details

#amount_to_be_flushedObject

amount_to_be_flushed returns the total value to be flushed. The total value is the sum of the following:

  • current value in the increment_key

  • any existing value in the flushed_key that has not been flushed



169
170
171
172
173
# File 'lib/gitlab/counters/buffered_counter.rb', line 169

def amount_to_be_flushed
  redis_state do |redis|
    redis.eval(LUA_FLUSH_INCREMENT_SCRIPT, keys: [key, flushed_key])
  end
end

#bulk_increment(increments) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/gitlab/counters/buffered_counter.rb', line 90

def bulk_increment(increments)
  result = redis_state do |redis|
    redis.pipelined do |pipeline|
      increments.each do |increment|
        pipeline.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment))
      end
    end
  end

  FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id,
    attribute.to_s)

  result.last.to_i
end

#cleanup_refreshObject



137
138
139
140
141
142
143
144
145
146
# File 'lib/gitlab/counters/buffered_counter.rb', line 137

def cleanup_refresh
  redis_state do |redis|
    while (shards = redis.spop(shards_key, CLEANUP_BATCH_SIZE))
      redis.del(*shards)
      break if shards.size < CLEANUP_BATCH_SIZE

      sleep CLEANUP_INTERVAL_SECONDS
    end
  end
end

#commit_increment!Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/gitlab/counters/buffered_counter.rb', line 148

def commit_increment!
  with_exclusive_lease do
    flush_amount = amount_to_be_flushed
    next if flush_amount == 0

    # We need the transaction so we can rollback the counter update if `remove_flushed_key` fails.
    counter_record.transaction do
      counter_record.update_counters({ attribute => flush_amount })
      remove_flushed_key
    end

    counter_record.execute_after_commit_callbacks
  end

  counter_record.reset.read_attribute(attribute)
end

#finalize_refreshObject



127
128
129
130
131
132
133
134
135
# File 'lib/gitlab/counters/buffered_counter.rb', line 127

def finalize_refresh
  redis_state do |redis|
    redis.eval(LUA_FINALIZE_REFRESH_SCRIPT, keys: [key, refresh_key, refresh_indicator_key])
  end

  FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id,
    attribute.to_s)
  ::Counters::CleanupRefreshWorker.perform_async(counter_record.class.name, counter_record.id, attribute)
end

#flushed_keyObject



182
183
184
# File 'lib/gitlab/counters/buffered_counter.rb', line 182

def flushed_key
  "#{key}:flushed"
end

#getObject



39
40
41
42
43
# File 'lib/gitlab/counters/buffered_counter.rb', line 39

def get
  redis_state do |redis|
    redis.get(key).to_i
  end
end

#increment(increment) ⇒ Object



79
80
81
82
83
84
85
86
87
88
# File 'lib/gitlab/counters/buffered_counter.rb', line 79

def increment(increment)
  result = redis_state do |redis|
    redis.eval(LUA_INCREMENT_WITH_DEDUPLICATION_SCRIPT, **increment_args(increment)).to_i
  end

  FlushCounterIncrementsWorker.perform_in(WORKER_DELAY, counter_record.class.name, counter_record.id,
    attribute.to_s)

  result
end

#initiate_refresh!Object



111
112
113
114
115
116
117
# File 'lib/gitlab/counters/buffered_counter.rb', line 111

def initiate_refresh!
  counter_record.update!(attribute => 0)

  redis_state do |redis|
    redis.eval(LUA_INITIATE_REFRESH_SCRIPT, keys: [key, refresh_indicator_key])
  end
end

#keyObject



175
176
177
178
179
180
# File 'lib/gitlab/counters/buffered_counter.rb', line 175

def key
  record_name = counter_record.class
  record_id = counter_record.id

  "#{counter_record.counters_key_prefix}:counters:#{record_name}:#{record_id}:#{attribute}"
end

#refresh_indicator_keyObject



186
187
188
# File 'lib/gitlab/counters/buffered_counter.rb', line 186

def refresh_indicator_key
  "#{key}:refresh-in-progress"
end

#refresh_keyObject



190
191
192
# File 'lib/gitlab/counters/buffered_counter.rb', line 190

def refresh_key
  "#{key}:refresh"
end