Class: DeprecationCollector::Storage::Redis

Inherits:
Base
  • Object
show all
Defined in:
lib/deprecation_collector/storage.rb

Overview

storing in redis with deduplication by fingerprint

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis: nil, mutex: nil, count: false, write_interval: 900, write_interval_jitter: 60, key_prefix: nil) ⇒ Redis



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/deprecation_collector/storage.rb', line 43

def initialize(redis: nil, mutex: nil, count: false, write_interval: 900, write_interval_jitter: 60,
               key_prefix: nil)
  super
  @key_prefix = key_prefix || "deprecations"
  @redis = redis
  @last_write_time = current_time
  @count = count
  @write_interval = write_interval
  @write_interval_jitter = write_interval_jitter
  # on cruby hash itself is threadsafe, but we need to prevent races
  @deprecations_mutex = mutex || Mutex.new
  @deprecations = {}
  @known_digests = Set.new
end

Instance Attribute Details

#countObject

Returns the value of attribute count.



41
42
43
# File 'lib/deprecation_collector/storage.rb', line 41

def count
  @count
end

#redisObject

Returns the value of attribute redis.



41
42
43
# File 'lib/deprecation_collector/storage.rb', line 41

def redis
  @redis
end

#write_intervalObject

Returns the value of attribute write_interval.



41
42
43
# File 'lib/deprecation_collector/storage.rb', line 41

def write_interval
  @write_interval
end

#write_interval_jitterObject

Returns the value of attribute write_interval_jitter.



41
42
43
# File 'lib/deprecation_collector/storage.rb', line 41

def write_interval_jitter
  @write_interval_jitter
end

Instance Method Details

#cleanup(&_block) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/deprecation_collector/storage.rb', line 167

def cleanup(&_block)
  cursor = 0
  removed = total = 0
  loop do
    cursor, data_pairs = @redis.hscan(data_hash_key, cursor) # NB: some pages may be empty
    total += data_pairs.size
    removed += delete(
      data_pairs.to_h.select { |_digest, data| yield(JSON.parse(data, symbolize_names: true)) }.keys
    )
    break if cursor == "0"
  end
  "#{removed} removed, #{total - removed} left"
end

#clear(enable: false) ⇒ Object



88
89
90
91
92
93
# File 'lib/deprecation_collector/storage.rb', line 88

def clear(enable: false)
  @redis.del(data_hash_key, counter_hash_key, notes_hash_key)
  @redis.del(enabled_flag_key) if enable
  @known_digests.clear
  @deprecations.clear
end

#delete(remove_digests) ⇒ Object



78
79
80
81
82
83
84
85
86
# File 'lib/deprecation_collector/storage.rb', line 78

def delete(remove_digests)
  return 0 unless remove_digests.any?

  @redis.pipelined do |pipe|
    pipe.hdel(data_hash_key, *remove_digests)
    pipe.hdel(notes_hash_key, *remove_digests)
    pipe.hdel(counter_hash_key, *remove_digests) if @count
  end.first
end

#disableObject



74
75
76
# File 'lib/deprecation_collector/storage.rb', line 74

def disable
  @redis.set(enabled_flag_key, "false")
end

#enableObject



70
71
72
# File 'lib/deprecation_collector/storage.rb', line 70

def enable
  @redis.set(enabled_flag_key, "true")
end

#enabled?Boolean



66
67
68
# File 'lib/deprecation_collector/storage.rb', line 66

def enabled?
  @redis.get(enabled_flag_key) != "false"
end

#fetch_known_digestsObject



95
96
97
98
# File 'lib/deprecation_collector/storage.rb', line 95

def fetch_known_digests
  # FIXME: use `.merge!`?
  @known_digests.merge(@redis.hkeys(data_hash_key))
end

#flush(force: false) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/deprecation_collector/storage.rb', line 110

def flush(force: false)
  return unless force || (current_time > @last_write_time + @write_interval)
  # do not disturb existing redis connection if already in pipeline, hope that will be flushed some other time
  return if in_redis_pipeline?

  deprecations_to_flush = nil
  @deprecations_mutex.synchronize do
    deprecations_to_flush = @deprecations
    @deprecations = {}
    @last_write_time = current_time
    # checking in this section to prevent multiple parallel check requests
    return DeprecationCollector.instance.instance_variable_set(:@enabled, false) unless enabled?
  end

  write_count_to_redis(deprecations_to_flush) if @count

  # make as few writes as possible, other workers may already have reported our warning
  fetch_known_digests
  deprecations_to_flush.reject! { |digest, _val| @known_digests.include?(digest) }
  return unless deprecations_to_flush.any?

  @known_digests.merge(deprecations_to_flush.keys)
  @redis.mapped_hmset(data_hash_key, deprecations_to_flush.transform_values(&:to_json))
end

#import(dump_hash) ⇒ Object



163
164
165
# File 'lib/deprecation_collector/storage.rb', line 163

def import(dump_hash)
  @redis.mapped_hmset(data_hash_key, dump_hash.transform_values(&:to_json))
end

#key_prefix=(val) ⇒ Object



181
182
183
184
# File 'lib/deprecation_collector/storage.rb', line 181

def key_prefix=(val)
  @enabled_flag_key = @data_hash_key = @counter_hash_key = @notes_hash_key = nil
  @key_prefix = val
end

#read_eachObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/deprecation_collector/storage.rb', line 135

def read_each
  cursor = 0
  loop do
    cursor, data_pairs = @redis.hscan(data_hash_key, cursor)

    if data_pairs.any?
      data_pairs.zip(
        @redis.hmget(counter_hash_key, data_pairs.map(&:first)),
        @redis.hmget(notes_hash_key, data_pairs.map(&:first))
      ).each do |(digest, data), count, notes|
        yield(digest, data, count, notes)
      end
    end
    break if cursor == "0"
  end
end

#read_one(digest) ⇒ Object



152
153
154
155
156
157
158
159
160
161
# File 'lib/deprecation_collector/storage.rb', line 152

def read_one(digest)
  [
    digest,
    *@redis.pipelined do |pipe|
      pipe.hget(data_hash_key, digest)
      pipe.hget(counter_hash_key, digest)
      pipe.hget(notes_hash_key, digest)
    end
  ]
end

#store(deprecation) ⇒ Object



100
101
102
103
104
105
106
107
108
# File 'lib/deprecation_collector/storage.rb', line 100

def store(deprecation)
  fresh = !@deprecations.key?(deprecation.digest)
  @deprecations_mutex.synchronize do
    (@deprecations[deprecation.digest] ||= deprecation).touch
  end

  flush if current_time - @last_write_time > (@write_interval + rand(@write_interval_jitter))
  fresh
end

#support_disabling?Boolean



58
59
60
# File 'lib/deprecation_collector/storage.rb', line 58

def support_disabling?
  true
end

#unsent_deprecationsObject



62
63
64
# File 'lib/deprecation_collector/storage.rb', line 62

def unsent_deprecations
  @deprecations
end