Class: DeprecationCollector::Storage::Redis
- Defined in:
- lib/deprecation_collector/storage.rb
Overview
storing in redis with deduplication by fingerprint
Instance Attribute Summary collapse
-
#count ⇒ Object
Returns the value of attribute count.
-
#redis ⇒ Object
Returns the value of attribute redis.
-
#write_interval ⇒ Object
Returns the value of attribute write_interval.
-
#write_interval_jitter ⇒ Object
Returns the value of attribute write_interval_jitter.
Instance Method Summary collapse
- #cleanup(&_block) ⇒ Object
- #clear(enable: false) ⇒ Object
- #delete(remove_digests) ⇒ Object
- #disable ⇒ Object
- #enable ⇒ Object
- #enabled? ⇒ Boolean
- #fetch_known_digests ⇒ Object
- #flush(force: false) ⇒ Object
- #import(dump_hash) ⇒ Object
-
#initialize(redis: nil, mutex: nil, count: false, write_interval: 900, write_interval_jitter: 60, key_prefix: nil) ⇒ Redis
constructor
A new instance of Redis.
- #key_prefix=(val) ⇒ Object
- #read_each ⇒ Object
- #read_one(digest) ⇒ Object
- #store(deprecation) ⇒ Object
- #support_disabling? ⇒ Boolean
- #unsent_deprecations ⇒ Object
Constructor Details
#initialize(redis: nil, mutex: nil, count: false, write_interval: 900, write_interval_jitter: 60, key_prefix: nil) ⇒ Redis
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/deprecation_collector/storage.rb', line 43 def initialize(redis: nil, mutex: nil, count: false, write_interval: 900, write_interval_jitter: 60, key_prefix: nil) super @key_prefix = key_prefix || "deprecations" @redis = redis @last_write_time = current_time @count = count @write_interval = write_interval @write_interval_jitter = write_interval_jitter # on cruby hash itself is threadsafe, but we need to prevent races @deprecations_mutex = mutex || Mutex.new @deprecations = {} @known_digests = Set.new end |
Instance Attribute Details
#count ⇒ Object
Returns the value of attribute count.
41 42 43 |
# File 'lib/deprecation_collector/storage.rb', line 41 def count @count end |
#redis ⇒ Object
Returns the value of attribute redis.
41 42 43 |
# File 'lib/deprecation_collector/storage.rb', line 41 def redis @redis end |
#write_interval ⇒ Object
Returns the value of attribute write_interval.
41 42 43 |
# File 'lib/deprecation_collector/storage.rb', line 41 def write_interval @write_interval end |
#write_interval_jitter ⇒ Object
Returns the value of attribute write_interval_jitter.
41 42 43 |
# File 'lib/deprecation_collector/storage.rb', line 41 def write_interval_jitter @write_interval_jitter end |
Instance Method Details
#cleanup(&_block) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/deprecation_collector/storage.rb', line 167 def cleanup(&_block) cursor = 0 removed = total = 0 loop do cursor, data_pairs = @redis.hscan(data_hash_key, cursor) # NB: some pages may be empty total += data_pairs.size removed += delete( data_pairs.to_h.select { |_digest, data| yield(JSON.parse(data, symbolize_names: true)) }.keys ) break if cursor == "0" end "#{removed} removed, #{total - removed} left" end |
#clear(enable: false) ⇒ Object
88 89 90 91 92 93 |
# File 'lib/deprecation_collector/storage.rb', line 88 def clear(enable: false) @redis.del(data_hash_key, counter_hash_key, notes_hash_key) @redis.del(enabled_flag_key) if enable @known_digests.clear @deprecations.clear end |
#delete(remove_digests) ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/deprecation_collector/storage.rb', line 78 def delete(remove_digests) return 0 unless remove_digests.any? @redis.pipelined do |pipe| pipe.hdel(data_hash_key, *remove_digests) pipe.hdel(notes_hash_key, *remove_digests) pipe.hdel(counter_hash_key, *remove_digests) if @count end.first end |
#disable ⇒ Object
74 75 76 |
# File 'lib/deprecation_collector/storage.rb', line 74 def disable @redis.set(enabled_flag_key, "false") end |
#enable ⇒ Object
70 71 72 |
# File 'lib/deprecation_collector/storage.rb', line 70 def enable @redis.set(enabled_flag_key, "true") end |
#enabled? ⇒ Boolean
66 67 68 |
# File 'lib/deprecation_collector/storage.rb', line 66 def enabled? @redis.get(enabled_flag_key) != "false" end |
#fetch_known_digests ⇒ Object
95 96 97 98 |
# File 'lib/deprecation_collector/storage.rb', line 95 def fetch_known_digests # FIXME: use `.merge!`? @known_digests.merge(@redis.hkeys(data_hash_key)) end |
#flush(force: false) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/deprecation_collector/storage.rb', line 110 def flush(force: false) return unless force || (current_time > @last_write_time + @write_interval) # do not disturb existing redis connection if already in pipeline, hope that will be flushed some other time return if in_redis_pipeline? deprecations_to_flush = nil @deprecations_mutex.synchronize do deprecations_to_flush = @deprecations @deprecations = {} @last_write_time = current_time # checking in this section to prevent multiple parallel check requests return DeprecationCollector.instance.instance_variable_set(:@enabled, false) unless enabled? end write_count_to_redis(deprecations_to_flush) if @count # make as few writes as possible, other workers may already have reported our warning fetch_known_digests deprecations_to_flush.reject! { |digest, _val| @known_digests.include?(digest) } return unless deprecations_to_flush.any? @known_digests.merge(deprecations_to_flush.keys) @redis.mapped_hmset(data_hash_key, deprecations_to_flush.transform_values(&:to_json)) end |
#import(dump_hash) ⇒ Object
163 164 165 |
# File 'lib/deprecation_collector/storage.rb', line 163 def import(dump_hash) @redis.mapped_hmset(data_hash_key, dump_hash.transform_values(&:to_json)) end |
#key_prefix=(val) ⇒ Object
181 182 183 184 |
# File 'lib/deprecation_collector/storage.rb', line 181 def key_prefix=(val) @enabled_flag_key = @data_hash_key = @counter_hash_key = @notes_hash_key = nil @key_prefix = val end |
#read_each ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/deprecation_collector/storage.rb', line 135 def read_each cursor = 0 loop do cursor, data_pairs = @redis.hscan(data_hash_key, cursor) if data_pairs.any? data_pairs.zip( @redis.hmget(counter_hash_key, data_pairs.map(&:first)), @redis.hmget(notes_hash_key, data_pairs.map(&:first)) ).each do |(digest, data), count, notes| yield(digest, data, count, notes) end end break if cursor == "0" end end |
#read_one(digest) ⇒ Object
152 153 154 155 156 157 158 159 160 161 |
# File 'lib/deprecation_collector/storage.rb', line 152 def read_one(digest) [ digest, *@redis.pipelined do |pipe| pipe.hget(data_hash_key, digest) pipe.hget(counter_hash_key, digest) pipe.hget(notes_hash_key, digest) end ] end |
#store(deprecation) ⇒ Object
100 101 102 103 104 105 106 107 108 |
# File 'lib/deprecation_collector/storage.rb', line 100 def store(deprecation) fresh = !@deprecations.key?(deprecation.digest) @deprecations_mutex.synchronize do (@deprecations[deprecation.digest] ||= deprecation).touch end flush if current_time - @last_write_time > (@write_interval + rand(@write_interval_jitter)) fresh end |
#support_disabling? ⇒ Boolean
58 59 60 |
# File 'lib/deprecation_collector/storage.rb', line 58 def support_disabling? true end |
#unsent_deprecations ⇒ Object
62 63 64 |
# File 'lib/deprecation_collector/storage.rb', line 62 def unsent_deprecations @deprecations end |