Class: Redis::Ick
- Inherits:
-
Object
- Object
- Redis::Ick
- Defined in:
- lib/redis/ick.rb,
lib/redis/ick/version.rb
Overview
Binds Lua code to provide the Ick operations in Redis.
Constant Summary collapse
- LUA_ICK_PREFIX =
LUA_ICK_PREFIX
A snippet of Lua code which is common to all the Ick scripts.
For convenience and to avoid repeating code, we set up some computed key names.
For safety, we check that the ick_ver, ick_pset, and ick_cset either do not exist or exit with the correct types and values to be identifiable as an Ick.
All scripts in the LUA_ICK series expect only one KEYS, the root key of the Ick data structure. We expect a version flag as a string at this key. Keys for other data are computed from KEYS in such a way as to guarantee they all hash to the same slot.
%{ local ick_key = KEYS[1] local ick_ver = redis.call('GET',ick_key) local ick_pset_key = ick_key .. '/ick/{' .. ick_key .. '}/pset' local ick_cset_key = ick_key .. '/ick/{' .. ick_key .. '}/cset' local ick_ver_type = redis.call('TYPE',ick_key).ok local ick_pset_type = redis.call('TYPE',ick_pset_key).ok local ick_cset_type = redis.call('TYPE',ick_cset_key).ok if (false ~= ick_ver and 'ick.v1' ~= ick_ver) then return redis.error_reply('unrecognized ick version ' .. ick_ver) end if ('none' ~= ick_ver_type and 'string' ~= ick_ver_type) then return redis.error_reply('ick defense: expected string at ' .. ick_ver_key .. ', found ' .. ick_ver_type) end if ('none' ~= ick_pset_type and 'zset' ~= ick_pset_type) then return redis.error_reply('ick defense: expected string at ' .. ick_pset_key .. ', found ' .. ick_pset_type) end if ('none' ~= ick_cset_type and 'zset' ~= ick_cset_type) then return redis.error_reply('ick defense: expected string at ' .. ick_cset_key .. ', found ' .. ick_cset_type) end if ('none' == ick_ver_type) then if ('none' ~= ick_pset_type) then return redis.error_reply('ick defense: no ver at ' .. ick_ver_key .. ', but found pset at ' .. ick_pset_key) end if ('none' ~= ick_cset_type) then return redis.error_reply('ick defense: no ver at ' .. ick_ver_key .. ', but found cset at ' .. ick_cset_key) end end }.freeze
- LUA_ICKDEL =
(LUA_ICK_PREFIX + %{ return redis.call('DEL',ick_key,ick_pset_key,ick_cset_key) }).freeze
- LUA_ICKSTATS =
LUA_ICKSTATS
KEYS, or nil if none.
Note: At redis.io/commands/eval, the “Lua to Redis conversion table” stats:
Lua number -> Redis integer reply (the number is converted into an integer) ...If you want to return a float from Lua you should return it as a string.We follow this recommendation in our Lua below where we convert our numeric responses to strings with “tostring(tonumber(n))”.
(LUA_ICK_PREFIX + %{ if (false == ick_ver) then return nil end local ick_pset_size = redis.call('ZCARD',ick_pset_key) local ick_cset_size = redis.call('ZCARD',ick_cset_key) local ick_stats = { 'key', ick_key, 'keys', { ick_key, ick_pset_key, ick_cset_key }, 'ver', ick_ver, 'cset_size', ick_cset_size, 'pset_size', ick_pset_size, 'total_size', ick_cset_size + ick_pset_size, } local pset_min = nil local pset_max = nil if ick_pset_size > 0 then pset_min = redis.call('ZRANGE',ick_pset_key, 0, 0,'WITHSCORES')[2] table.insert(ick_stats, 'pset_min') table.insert(ick_stats, tostring(tonumber(pset_min))) pset_max = redis.call('ZRANGE',ick_pset_key,-1,-1,'WITHSCORES')[2] table.insert(ick_stats, 'pset_max') table.insert(ick_stats, tostring(tonumber(pset_max))) end local cset_min = nil local cset_max = nil if ick_cset_size > 0 then cset_min = redis.call('ZRANGE',ick_cset_key, 0, 0,'WITHSCORES')[2] table.insert(ick_stats, 'cset_min') table.insert(ick_stats, tostring(tonumber(cset_min))) cset_max = redis.call('ZRANGE',ick_cset_key,-1,-1,'WITHSCORES')[2] table.insert(ick_stats, 'cset_max') table.insert(ick_stats, tostring(tonumber(cset_max))) end local total_min = nil if pset_min and cset_min then total_min = math.min(cset_min,pset_min) elseif pset_min then total_min = pset_min elseif cset_min then total_min = cset_min end if total_min then table.insert(ick_stats, 'total_min') table.insert(ick_stats, tostring(tonumber(total_min))) end local total_max = nil if pset_max and cset_max then total_max = math.max(cset_max,pset_max) elseif pset_max then total_max = pset_max elseif cset_max then total_max = cset_max end if total_max then table.insert(ick_stats, 'total_max') table.insert(ick_stats, tostring(tonumber(total_max))) end return ick_stats }).freeze
- LUA_ICKADD =
LUA_ICKADD
Adds members to the cset as per ZADD. Where a member is re-written, we always take the lowest score.
Thus, scores are only allowed to move downward. changes to score.
Creates the Ick if necessary.
(LUA_ICK_PREFIX + %{ local num_args = table.getn(ARGV) if 1 == (num_args % 2) then return redis.error_reply("odd number of arguments for 'ickadd' command") end local num_new = 0 local num_changed = 0 for i = 1,num_args,2 do local score = tonumber(ARGV[i]) local member = ARGV[i+1] local old_score = redis.call('ZSCORE',ick_pset_key,member) if false == old_score then redis.call('ZADD',ick_pset_key,score,member) num_new = num_new + 1 elseif score < tonumber(old_score) then redis.call('ZADD',ick_pset_key,score,member) num_changed = num_changed + 1 end end redis.call('SETNX', ick_key, 'ick.v1') return { num_new, num_changed } }).freeze
- LUA_ICKRESERVE =
LUA_ICKRESERVE
Tops up the cset to up to size ARGV by shifting the lowest-scored members over from the pset.
The cset might already be full, in which case we may shift fewer than ARGV elements.
The same score-folding happens as per ICKADD. Thus where there are duplicate messages, we may remove more members from the pset than we add to the cset.
size for cset and to be returned
(LUA_ICK_PREFIX + %{ local target_cset_size = tonumber(ARGV[1]) while true do local ick_cset_size = redis.call('ZCARD',ick_cset_key) if ick_cset_size and target_cset_size <= ick_cset_size then break end local first_in_pset = redis.call('ZRANGE',ick_pset_key,0,0,'WITHSCORES') if 0 == table.getn(first_in_pset) then break end local first_member = first_in_pset[1] local first_score = tonumber(first_in_pset[2]) redis.call('ZREM',ick_pset_key,first_member) local old_score = redis.call('ZSCORE',ick_cset_key,first_member) if false == old_score or first_score < tonumber(old_score) then redis.call('ZADD',ick_cset_key,first_score,first_member) end end redis.call('SETNX', ick_key, 'ick.v1') if target_cset_size <= 0 then return {} else local max = target_cset_size - 1 return redis.call('ZRANGE',ick_cset_key,0,max,'WITHSCORES') end }).freeze
- LUA_ICKCOMMIT =
LUA_ICKCOMMIT
Removes specified members from the pset.
Note: This this Lua unpacks ARGV with the iterator ipairs() instead of unpack() to avoid a “too many results to unpack” failure at 8000 args. However, the loop over many redis.call is regrettably heavy-weight. From a performance standpoint it would be preferable to call ZREM in larger batches.
(LUA_ICK_PREFIX + %{ redis.call('SETNX', ick_key, 'ick.v1') if 0 == table.getn(ARGV) then return 0 end local num_removed = 0 for i,v in ipairs(ARGV) do num_removed = num_removed + redis.call('ZREM',ick_cset_key,v) end return num_removed }).freeze
- VERSION =
Version plan/history:
0.0.1 - Still in Prosperworks/ALI/vendor/gems/redis-ick.
0.0.2 - Broke out into Prosperworks/redis-ick, make public.
0.0.3 - Got .travis.yml working with a live redis-server.
Runtime dependency on redis-script_manager for Ick._eval. Initial Rubocop integration. Misc cleanup.0.0.4 - Imported text from original design doc to README.md, polish.
Rubocop polish and defiance. Development dependency on redis-key_hash to test prescriptive hash claims. Identified limits of prescriptive hash robustness.0.1.0 - (future) Big README.md and Rdoc update, solicit feedback
from select external beta users.0.2.0 - (future) Incorporate feedback, announce.
'0.0.4'.freeze
Instance Attribute Summary collapse
-
#redis ⇒ Object
Returns the value of attribute redis.
-
#statsd ⇒ Object
Returns the value of attribute statsd.
Class Method Summary collapse
-
._floatify(str) ⇒ Object
Converts a string str into a Float, and recognizes ‘inf’, ‘-inf’, etc.
- ._postprocess_ickstats_results(raw_ickstats_results) ⇒ Object
Instance Method Summary collapse
-
#_eval(lua, ick_key, *args) ⇒ Object
Runs the specified lua in the redis against the specifified Ick.
-
#_statsd_increment(metric) ⇒ Object
Reports a single count on the requested metric to statsd (if any).
-
#_statsd_time(metric) ⇒ Object
Executes the block (if any) and reports its timing in milliseconds on the requested metric to statsd (if any).
-
#_statsd_timing(metric, time) ⇒ Object
Reports the specified timing on the requested metric to statsd (if any).
-
#ickadd(ick_key, *score_member_pairs) ⇒ Object
Adds all the specified members with the specified scores to the Ick stored at key.
-
#ickcommit(ick_key, *members) ⇒ Object
Removes the indicated members from the producer set, if present.
-
#ickdel(ick_key) ⇒ Object
Removes all data associated with the Ick in Redis at key.
-
#ickreserve(ick_key, max_size = 0) ⇒ Object
Tops up the consumer set up to max_size by shifting the lowest-scored elements from the producer set into the consumer set until the consumer set cardinality reaches max_size or the producer set is exhausted.
-
#ickstats(ick_key) ⇒ Object
Fetches stats.
-
#initialize(redis, statsd: nil) ⇒ Ick
constructor
Creates an Ick accessor.
Constructor Details
#initialize(redis, statsd: nil) ⇒ Ick
Creates an Ick accessor.
to :increment and :timing.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/redis/ick.rb', line 19 def initialize(redis, statsd: nil) if !redis.is_a?(Redis) raise ArgumentError, "not a Redis: #{redis}" end if statsd if !statsd.respond_to?(:increment) raise ArgumentError, 'no statsd.increment' end if !statsd.respond_to?(:timing) raise ArgumentError, 'no statsd.timeing' end if !statsd.respond_to?(:time) raise ArgumentError, 'no statsd.time' end end @redis = redis @statsd = statsd end |
Instance Attribute Details
#redis ⇒ Object
Returns the value of attribute redis.
38 39 40 |
# File 'lib/redis/ick.rb', line 38 def redis @redis end |
#statsd ⇒ Object
Returns the value of attribute statsd.
39 40 41 |
# File 'lib/redis/ick.rb', line 39 def statsd @statsd end |
Class Method Details
._floatify(str) ⇒ Object
Converts a string str into a Float, and recognizes ‘inf’, ‘-inf’, etc.
So we can be certain of compatibility, this was stolen with tweaks from github.com/redis/redis-rb/blob/master/lib/redis.rb.
315 316 317 318 319 320 321 322 |
# File 'lib/redis/ick.rb', line 315 def self._floatify(str) raise ArgumentError, "not String: #{str}" if !str.is_a?(String) if (inf = str.match(/^(-)?inf/i)) (inf[1] ? -1.0 : 1.0) / 0.0 else Float(str) end end |
._postprocess_ickstats_results(raw_ickstats_results) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/redis/ick.rb', line 131 def self._postprocess_ickstats_results(raw_ickstats_results) return nil if !raw_ickstats_results # # LUA_ICKSTATS returned bulk data response [k,v,k,v,...] # stats = Hash[*raw_ickstats_results] # # From http://redis.io/commands/eval, the "Lua to Redis conversion # table" states that: # # Lua number -> Redis integer reply (the number is converted # into an integer) # # ...If you want to return a float from Lua you should return # it as a string. # # LUA_ICKSTATS works around this by converting certain stats to # strings. We reverse that conversion here. # stats.keys.select{|k|/_min$/ =~ k || /_max$/ =~ k}.each do |k| next if !stats[k] stats[k] = (/^\d+$/ =~ stats[k]) ? stats[k].to_i : stats[k].to_f end stats end |
Instance Method Details
#_eval(lua, ick_key, *args) ⇒ Object
Runs the specified lua in the redis against the specifified Ick.
326 327 328 329 330 331 332 333 334 |
# File 'lib/redis/ick.rb', line 326 def _eval(lua,ick_key,*args) if !lua.is_a?(String) raise ArgumentError, "bogus non-String lua #{lua}" end if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end Redis::ScriptManager.eval_gently(redis,lua,[ick_key],args) end |
#_statsd_increment(metric) ⇒ Object
Reports a single count on the requested metric to statsd (if any).
45 46 47 |
# File 'lib/redis/ick.rb', line 45 def _statsd_increment(metric) statsd.increment(metric) if statsd end |
#_statsd_time(metric) ⇒ Object
Executes the block (if any) and reports its timing in milliseconds on the requested metric to statsd (if any).
65 66 67 68 69 70 71 72 73 |
# File 'lib/redis/ick.rb', line 65 def _statsd_time(metric) if statsd statsd.time(metric) do block_given? ? yield : nil end else block_given? ? yield : nil end end |
#_statsd_timing(metric, time) ⇒ Object
Reports the specified timing on the requested metric to statsd (if any).
54 55 56 |
# File 'lib/redis/ick.rb', line 54 def _statsd_timing(metric,time) statsd.timing(metric,time) if statsd end |
#ickadd(ick_key, *score_member_pairs) ⇒ Object
Adds all the specified members with the specified scores to the Ick stored at key.
Entries are stored in order by score. Lower-scored entries will pop out in reserve before higher-scored entries. Re-adding an entry which already exists in the producer set with a new score results in the entry having the lowest of the old and new scores.
Similar to redis.io/commands/zadd with a modified NX option, operating on the producer set.
Usage:
ick.ickadd(ick_key,score,member[,score,member]*)
Suggested usage is for scores to be a Unix timestamp indicating when something became dirty.
changed scores.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/redis/ick.rb', line 182 def ickadd(ick_key,*score_member_pairs) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end if score_member_pairs.size.odd? raise ArgumentError, "bogus odd-numbered #{score_member_pairs}" end score_member_pairs.each_slice(2) do |slice| score, member = slice if ! score.is_a? Numeric raise ArgumentError, "bogus non-Numeric score #{score}" end if ! member.is_a? String raise ArgumentError, "bogus non-String member #{member}" end end _statsd_increment('profile.ick.ickadd.calls') _statsd_timing('profile.ick.ickadd.pairs',score_member_pairs.size / 2) _statsd_time('profile.ick.time.ickadd') do _eval(LUA_ICKADD,ick_key,*score_member_pairs) end end |
#ickcommit(ick_key, *members) ⇒ Object
Removes the indicated members from the producer set, if present.
Similar to ZREM ick_key [member]*, per redis.io/commands/zrem, operating on the consumer set only.
Usage:
ick.ickcommit(ick_key,memberA,memberB,...)
Committed elements are meant to represent consumer work-completed.
producer set, not including non existing members.
298 299 300 301 302 303 304 305 306 307 |
# File 'lib/redis/ick.rb', line 298 def ickcommit(ick_key,*members) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end _statsd_increment('profile.ick.ickcommit.calls') _statsd_timing('profile.ick.ickcommit.members',members.size) _statsd_time('profile.ick.time.ickcommit') do _eval(LUA_ICKCOMMIT,ick_key,*members) end end |
#ickdel(ick_key) ⇒ Object
Removes all data associated with the Ick in Redis at key.
Similar to DEL key, redis.io/commands/del, but may delete multiple keys which together implement the Ick data structure.
be >= 1 if an Ick existed at key.
86 87 88 89 90 91 92 93 94 |
# File 'lib/redis/ick.rb', line 86 def ickdel(ick_key) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end _statsd_increment('profile.ick.ickdel.calls') _statsd_time('profile.ick.ickdel.time') do _eval(LUA_ICKDEL,ick_key) end end |
#ickreserve(ick_key, max_size = 0) ⇒ Object
Tops up the consumer set up to max_size by shifting the lowest-scored elements from the producer set into the consumer set until the consumer set cardinality reaches max_size or the producer set is exhausted.
The reserved elements are meant to represent consumer work-in-progress. If they are not committed, they will be returned again in future calls to ickreserve.
Note that the Lua for ick is irritating like so:
- you add in the pattern [ score_number, member_string, ... ]
- you retrieve in the pattern [ member_string, score_string, ... ]
Native ZADD and ZRANGE WITHSCORES exhibit this same irritating inconsistency: Ick is annoyance-compatible with Redis sorted sets.
However, by analogy with redis-rb’s Redis.current.zrange(), this Ruby wrapper method pairs up the results for you, and converts the string scores to floats.
- you get from this method [[ member_string, score_number] , ... ]
Redis.current.zrange() withscores: [ member_string, score_number ] representing the lowest-scored elements from the producer set.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/redis/ick.rb', line 236 def ickreserve(ick_key,max_size=0) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end if !max_size.is_a?(Integer) raise ArgumentError, "bogus non-Integer max_size #{max_size}" end if max_size < 0 raise ArgumentError, "bogus negative #{max_size}" end _statsd_increment('profile.ick.ickreserve.calls') _statsd_timing('profile.ick.ickreserve.max_size',max_size) raw_ickreserve_results = nil _statsd_time('profile.ick.time.ickreserve') do raw_ickreserve_results = _eval( LUA_ICKRESERVE, ick_key, max_size ) end if raw_ickreserve_results.is_a?(Redis::Future) # # We extend the Redis::Future with a continuation so we can # add our own post-processing. # class << raw_ickreserve_results alias_method :original_value, :value def value original_value.each_slice(2).map do |p| [ p[0], ::Redis::Ick._floatify(p[1]) ] end end end raw_ickreserve_results else results = raw_ickreserve_results.each_slice(2).map do |p| [ p[0], ::Redis::Ick._floatify(p[1]) ] end _statsd_timing('profile.ick.ickreserve.num_results',results.size) results end end |
#ickstats(ick_key) ⇒ Object
Fetches stats.
about the Ick at ick_key, if any, else nil. If called within a pipeline, returns a redis::Future whose value is a Hash or nil as before.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/redis/ick.rb', line 105 def ickstats(ick_key) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end _statsd_increment('profile.ick.ickstats.calls') raw_ickstats_results = nil _statsd_time('profile.ick.time.ickstats') do raw_ickstats_results = _eval(LUA_ICKSTATS,ick_key) end if raw_ickstats_results.is_a?(Redis::Future) # # We extend the Redis::Future with a continuation so we can add # our own post-processing. # class << raw_ickstats_results alias_method :original_value, :value def value ::Redis::Ick._postprocess_ickstats_results(original_value) end end raw_ickstats_results else ::Redis::Ick._postprocess_ickstats_results(raw_ickstats_results) end end |