Class: Redis::Ick
- Inherits:
-
Object
- Object
- Redis::Ick
- Defined in:
- lib/redis/ick.rb,
lib/redis/ick/version.rb
Overview
Accessor for Ick data structures in Redis.
Defined Under Namespace
Classes: FutureContinuation
Constant Summary collapse
- Skip0ThenFloatifyPairs =
Postprocessing done on the LUA_ICKEXCHANGE results for both ickreserve and ickexchange.
results are num_committed followed by N message-and-score pairs.
We do results to skip the first element, num_committed.
On the rest, we floatify the scores to convert from Redis number-as-string limitation to Ruby Floats.
This is similar to to Redis::FloatifyPairs:
github.com/redis/redis-rb/blob/master/lib/redis.rb#L2887-L2896
lambda do |results| results[1..-1].each_slice(2).map do |m_and_s| [ m_and_s[0], ::Redis::Ick._floatify(m_and_s[1]) ] end end
- LUA_ICK_PREFIX =
LUA_ICK_PREFIX
A snippet of Lua code which is common to all the Ick scripts.
For convenience and to avoid repeating code, we set up some computed key names.
For safety, we check that the ick_ver, ick_pset, and ick_cset either do not exist or exit with the correct types and values to be identifiable as an Ick.
All scripts in the LUA_ICK series expect only one KEYS, the root key of the Ick data structure. We expect a version flag as a string at this key. Keys for other data are computed from KEYS in such a way as to guarantee they all hash to the same slot.
%{ local ick_key = KEYS[1] local ick_ver = redis.call('GET',ick_key) local ick_pset_key = ick_key .. '/ick/{' .. ick_key .. '}/pset' local ick_cset_key = ick_key .. '/ick/{' .. ick_key .. '}/cset' local ick_ver_type = redis.call('TYPE',ick_key).ok local ick_pset_type = redis.call('TYPE',ick_pset_key).ok local ick_cset_type = redis.call('TYPE',ick_cset_key).ok if (false ~= ick_ver and 'ick.v1' ~= ick_ver) then return redis.error_reply('unrecognized ick version ' .. ick_ver) end if ('none' ~= ick_ver_type and 'string' ~= ick_ver_type) then return redis.error_reply('ick defense: expected string at ' .. ick_ver_key .. ', found ' .. ick_ver_type) end if ('none' ~= ick_pset_type and 'zset' ~= ick_pset_type) then return redis.error_reply('ick defense: expected string at ' .. ick_pset_key .. ', found ' .. ick_pset_type) end if ('none' ~= ick_cset_type and 'zset' ~= ick_cset_type) then return redis.error_reply('ick defense: expected string at ' .. ick_cset_key .. ', found ' .. ick_cset_type) end if ('none' == ick_ver_type) then if ('none' ~= ick_pset_type) then return redis.error_reply('ick defense: no ver at ' .. ick_ver_key .. ', but found pset at ' .. ick_pset_key) end if ('none' ~= ick_cset_type) then return redis.error_reply('ick defense: no ver at ' .. ick_ver_key .. ', but found cset at ' .. ick_cset_key) end end }.freeze
- LUA_ICKDEL =
(LUA_ICK_PREFIX + %{ return redis.call('DEL',ick_key,ick_pset_key,ick_cset_key) }).freeze
- LUA_ICKSTATS =
LUA_ICKSTATS
KEYS, or nil if none.
Note: At redis.io/commands/eval, the “Lua to Redis conversion table” stats:
Lua number -> Redis integer reply (the number is converted into an integer) ...If you want to return a float from Lua you should return it as a string.We follow this recommendation in our Lua below where we convert our numeric responses to strings with “tostring(tonumber(n))”.
(LUA_ICK_PREFIX + %{ if (false == ick_ver) then return nil end local ick_pset_size = redis.call('ZCARD',ick_pset_key) local ick_cset_size = redis.call('ZCARD',ick_cset_key) local ick_stats = { 'key', ick_key, 'pset_key', ick_pset_key, 'cset_key', ick_cset_key, 'ver', ick_ver, 'cset_size', ick_cset_size, 'pset_size', ick_pset_size, 'total_size', ick_cset_size + ick_pset_size, } local pset_min = nil local pset_max = nil if ick_pset_size > 0 then pset_min = redis.call('ZRANGE',ick_pset_key, 0, 0,'WITHSCORES')[2] table.insert(ick_stats, 'pset_min') table.insert(ick_stats, tostring(tonumber(pset_min))) pset_max = redis.call('ZRANGE',ick_pset_key,-1,-1,'WITHSCORES')[2] table.insert(ick_stats, 'pset_max') table.insert(ick_stats, tostring(tonumber(pset_max))) end local cset_min = nil local cset_max = nil if ick_cset_size > 0 then cset_min = redis.call('ZRANGE',ick_cset_key, 0, 0,'WITHSCORES')[2] table.insert(ick_stats, 'cset_min') table.insert(ick_stats, tostring(tonumber(cset_min))) cset_max = redis.call('ZRANGE',ick_cset_key,-1,-1,'WITHSCORES')[2] table.insert(ick_stats, 'cset_max') table.insert(ick_stats, tostring(tonumber(cset_max))) end local total_min = nil if pset_min and cset_min then total_min = math.min(cset_min,pset_min) elseif pset_min then total_min = pset_min elseif cset_min then total_min = cset_min end if total_min then table.insert(ick_stats, 'total_min') table.insert(ick_stats, tostring(tonumber(total_min))) end local total_max = nil if pset_max and cset_max then total_max = math.max(cset_max,pset_max) elseif pset_max then total_max = pset_max elseif cset_max then total_max = cset_max end if total_max then table.insert(ick_stats, 'total_max') table.insert(ick_stats, tostring(tonumber(total_max))) end return ick_stats }).freeze
- LUA_ICKADD =
LUA_ICKADD
Adds members to the cset as per ZADD. Where a member is re-written, we always take the lowest score.
Thus, scores are only allowed to move downward. changes to score.
Creates the Ick if necessary.
(LUA_ICK_PREFIX + %{ local num_args = table.getn(ARGV) if 1 == (num_args % 2) then return redis.error_reply("odd number of arguments for 'ickadd' command") end local num_new = 0 local num_changed = 0 for i = 1,num_args,2 do local score = tonumber(ARGV[i]) local member = ARGV[i+1] local old_score = redis.call('ZSCORE',ick_pset_key,member) if false == old_score then redis.call('ZADD',ick_pset_key,score,member) num_new = num_new + 1 elseif score < tonumber(old_score) then redis.call('ZADD',ick_pset_key,score,member) num_changed = num_changed + 1 end end redis.call('SETNX', ick_key, 'ick.v1') return { num_new, num_changed } }).freeze
- LUA_ICKEXCHANGE =
LUA_ICKEXCHANGE: commit then reserve
Commit Function
Removes specified members in ARGV from the pset, then tops up the cset to up to size ARGV by shifting the lowest-scored members over from the pset.
The cset might already be full, in which case we may shift fewer than ARGV elements.
Reserve Function
Tops up the cset to up to size ARGV by shifting the lowest-scored members over from the pset.
The cset might already be full, in which case we may shift fewer than ARGV elements.
The same score-folding happens as per ICKADD. Thus where there are duplicate messages, we may remove more members from the pset than we add to the cset.
cset and to be returned
cset by the commit function followed by up to ARGV pairs
- member,score,…
-
from the reserve funciton.
Note: This Lua unpacks ARGV with the iterator ipairs() instead of unpack() to avoid a “too many results to unpack” failure at 8000 args. However, the loop over many redis.call is regrettably heavy-weight. From a performance standpoint it would be preferable to call ZREM in larger batches.
(LUA_ICK_PREFIX + %{ local reserve_size = tonumber(ARGV[1]) local backwash = ARGV[2] local argc = table.getn(ARGV) local num_committed = 0 for i = 3,argc,1 do local num_zrem = redis.call('ZREM',ick_cset_key,ARGV[i]) num_committed = num_committed + num_zrem end if 'backwash' == backwash then local cset_all = redis.call('ZRANGE',ick_cset_key,0,-1,'WITHSCORES') local cset_size = table.getn(cset_all) for i = 1,cset_size,2 do local member = cset_all[i] local score = cset_all[i+1] local old_score = redis.call('ZSCORE',ick_pset_key,member) if false == old_score then redis.call('ZADD',ick_pset_key,score,member) elseif tonumber(score) < tonumber(old_score) then redis.call('ZADD',ick_pset_key,score,member) end end redis.call('DEL',ick_cset_key) end while true do local cset_size = redis.call('ZCARD',ick_cset_key) if cset_size and reserve_size <= cset_size then break end local first_pset = redis.call('ZRANGE',ick_pset_key,0,0,'WITHSCORES') if 0 == table.getn(first_pset) then break end local first_member = first_pset[1] local first_score = tonumber(first_pset[2]) redis.call('ZREM',ick_pset_key,first_member) local old_score = redis.call('ZSCORE',ick_cset_key,first_member) if false == old_score or first_score < tonumber(old_score) then redis.call('ZADD',ick_cset_key,first_score,first_member) end end redis.call('SETNX', ick_key, 'ick.v1') local result = { num_committed } if reserve_size > 0 then local max = reserve_size - 1 local cset_batch = redis.call('ZRANGE',ick_cset_key,0,max,'WITHSCORES') for _i,v in ipairs(cset_batch) do table.insert(result,v) end end return result }).freeze
- VERSION =
'0.1.1'.freeze
Instance Attribute Summary collapse
-
#redis ⇒ Object
Returns the value of attribute redis.
-
#statsd ⇒ Object
Returns the value of attribute statsd.
Class Method Summary collapse
-
._floatify(str) ⇒ Object
Converts a string str into a Float, and recognizes ‘inf’, ‘-inf’, etc.
Instance Method Summary collapse
-
#_eval(lua, ick_key, *args) ⇒ Object
Runs the specified lua in the redis against the specifified Ick.
-
#_postprocess(raw_results, callback) ⇒ Object
Calls back to block with the results.
-
#_statsd_increment(metric) ⇒ Object
Reports a single count on the requested metric to statsd (if any).
-
#_statsd_time(metric) ⇒ Object
Executes the block (if any) and reports its timing in milliseconds on the requested metric to statsd (if any).
-
#_statsd_timing(metric, time) ⇒ Object
Reports the specified timing on the requested metric to statsd (if any).
-
#ickadd(ick_key, *score_member_pairs) ⇒ Object
Adds all the specified members with the specified scores to the Ick stored at key.
-
#ickcommit(ick_key, *members) ⇒ Object
Removes the indicated members from the producer set, if present.
-
#ickdel(ick_key) ⇒ Object
Removes all data associated with the Ick in Redis at key.
-
#ickexchange(ick_key, reserve_size, *commit_members, backwash: false) ⇒ Object
ickexchange combines several functions in one Redis round-trip.
-
#ickreserve(ick_key, max_size = 0, backwash: false) ⇒ Object
Tops up the consumer set up to max_size by shifting the lowest-scored elements from the producer set into the consumer set until the consumer set cardinality reaches max_size or the producer set is exhausted.
-
#ickstats(ick_key) ⇒ Object
Fetches stats.
-
#initialize(redis, statsd: nil) ⇒ Ick
constructor
Creates an Ick accessor.
Constructor Details
#initialize(redis, statsd: nil) ⇒ Ick
Creates an Ick accessor.
to :increment and :timing.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/redis/ick.rb', line 19 def initialize(redis, statsd: nil) if !redis.is_a?(Redis) raise ArgumentError, "not a Redis: #{redis}" end if statsd if !statsd.respond_to?(:increment) raise ArgumentError, 'no statsd.increment' end if !statsd.respond_to?(:timing) raise ArgumentError, 'no statsd.timeing' end if !statsd.respond_to?(:time) raise ArgumentError, 'no statsd.time' end end @redis = redis @statsd = statsd end |
Instance Attribute Details
#redis ⇒ Object
Returns the value of attribute redis.
38 39 40 |
# File 'lib/redis/ick.rb', line 38 def redis @redis end |
#statsd ⇒ Object
Returns the value of attribute statsd.
39 40 41 |
# File 'lib/redis/ick.rb', line 39 def statsd @statsd end |
Class Method Details
._floatify(str) ⇒ Object
Converts a string str into a Float, and recognizes ‘inf’, ‘-inf’, etc.
So we can be certain of compatibility, this was stolen with tweaks from:
https://github.com/redis/redis-rb/blob/master/lib/redis.rb#L2876-L2885
450 451 452 453 454 455 456 457 |
# File 'lib/redis/ick.rb', line 450 def self._floatify(str) raise ArgumentError, "not String: #{str}" if !str.is_a?(String) if (inf = str.match(/^(-)?inf/i)) (inf[1] ? -1.0 : 1.0) / 0.0 else Float(str) end end |
Instance Method Details
#_eval(lua, ick_key, *args) ⇒ Object
Runs the specified lua in the redis against the specifified Ick.
461 462 463 464 465 466 467 468 469 |
# File 'lib/redis/ick.rb', line 461 def _eval(lua,ick_key,*args) if !lua.is_a?(String) raise ArgumentError, "bogus non-String lua #{lua}" end if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end Redis::ScriptManager.eval_gently(redis,lua,[ick_key],args) end |
#_postprocess(raw_results, callback) ⇒ Object
Calls back to block with the results.
If raw_results is a Redis::Future, callback will be deferred until the future is expanded.
Otherwise, callback will happen immediately.
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/redis/ick.rb', line 375 def _postprocess(raw_results,callback) if raw_results.is_a?(Redis::Future) # # Redis::Future have a built-in mechanism for calling a # transformation on the raw results. # # Here, we monkey-patch not the Redis::Future class, but just # this one raw_results object. We give ourselves a door to # set the post-processing transformation. # # The transformation will be called only once when the real # results are materialized. # class << raw_results def transformation=(transformation) raise "transformation collision" if @transformation @transformation = transformation end end raw_results.transformation = callback raw_results else # # If not Redis::Future, we invoke the callback immediately. # callback.call(raw_results) end end |
#_statsd_increment(metric) ⇒ Object
Reports a single count on the requested metric to statsd (if any).
45 46 47 |
# File 'lib/redis/ick.rb', line 45 def _statsd_increment(metric) statsd.increment(metric) if statsd end |
#_statsd_time(metric) ⇒ Object
Executes the block (if any) and reports its timing in milliseconds on the requested metric to statsd (if any).
65 66 67 68 69 70 71 72 73 |
# File 'lib/redis/ick.rb', line 65 def _statsd_time(metric) if statsd statsd.time(metric) do block_given? ? yield : nil end else block_given? ? yield : nil end end |
#_statsd_timing(metric, time) ⇒ Object
Reports the specified timing on the requested metric to statsd (if any).
54 55 56 |
# File 'lib/redis/ick.rb', line 54 def _statsd_timing(metric,time) statsd.timing(metric,time) if statsd end |
#ickadd(ick_key, *score_member_pairs) ⇒ Object
Adds all the specified members with the specified scores to the Ick stored at key.
Entries are stored in order by score. Lower-scored entries will pop out in reserve before higher-scored entries. Re-adding an entry which already exists in the producer set with a new score results in the entry having the lowest of the old and new scores.
Similar to redis.io/commands/zadd with a modified NX option, operating on the producer set.
Usage:
ick.ickadd(ick_key,score,member[,score,member]*)
Suggested usage is for scores to be a Unix timestamp indicating when something became dirty.
changed scores.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/redis/ick.rb', line 169 def ickadd(ick_key,*score_member_pairs) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end if score_member_pairs.size.odd? raise ArgumentError, "bogus odd-numbered #{score_member_pairs}" end score_member_pairs.each_slice(2) do |slice| score, member = slice if ! score.is_a? Numeric raise ArgumentError, "bogus non-Numeric score #{score}" end if ! member.is_a? String raise ArgumentError, "bogus non-String member #{member}" end end _statsd_increment('profile.ick.ickadd.calls') _statsd_timing('profile.ick.ickadd.pairs',score_member_pairs.size / 2) _statsd_time('profile.ick.time.ickadd') do _eval(LUA_ICKADD,ick_key,*score_member_pairs) end end |
#ickcommit(ick_key, *members) ⇒ Object
Removes the indicated members from the producer set, if present.
Similar to ZREM ick_key [member]*, per redis.io/commands/zrem, operating on the consumer set only.
Usage:
ick.ickcommit(ick_key,memberA,memberB,...)
Committed elements are meant to represent consumer work-completed.
producer set, not including non existing members.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/redis/ick.rb', line 270 def ickcommit(ick_key,*members) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end _statsd_increment('profile.ick.ickcommit.calls') _statsd_timing('profile.ick.ickcommit.members',members.size) raw_results = nil _statsd_time('profile.ick.time.ickcommit') do raw_results = _eval( LUA_ICKEXCHANGE, ick_key, 0, false, # backwash not relevant in ickcommit *members ) end # # raw_results are num_committed followed by 0 message-and-score # pairs. # # We just capture the num_committed. # _postprocess(raw_results,lambda { |results| results[0] }) end |
#ickdel(ick_key) ⇒ Object
Removes all data associated with the Ick in Redis at key.
Similar to DEL key, redis.io/commands/del, but may delete multiple keys which together implement the Ick data structure.
be >= 1 if an Ick existed at key.
86 87 88 89 90 91 92 93 94 |
# File 'lib/redis/ick.rb', line 86 def ickdel(ick_key) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end _statsd_increment('profile.ick.ickdel.calls') _statsd_time('profile.ick.ickdel.time') do _eval(LUA_ICKDEL,ick_key) end end |
#ickexchange(ick_key, reserve_size, *commit_members, backwash: false) ⇒ Object
ickexchange combines several functions in one Redis round-trip.
-
As ickcommit, removes consumed members from the consumer set.
-
As ickreserve, tops up the consumer set from the producer and returns the requested new consumer members, if any.
with high scores are swapped out for pset members with lower scores. Otherwise cset members remain in the cset until committed regardless of how low scores in the pset might be.
Redis.current.zrange() withscores: [ message, score ] representing the lowest-scored elements from the producer set after the commit and reserve operations.
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/redis/ick.rb', line 318 def ickexchange(ick_key,reserve_size,*commit_members,backwash: false) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end if !reserve_size.is_a?(Integer) raise ArgumentError, "bogus non-Integer reserve_size #{reserve_size}" end if reserve_size < 0 raise ArgumentError, "bogus negative reserve_size #{reserve_size}" end _statsd_increment('profile.ick.ickexchange.calls') _statsd_timing('profile.ick.ickexchange.reserve_size',reserve_size) _statsd_timing( 'profile.ick.ickexchange.commit_members', commit_members.size ) raw_results = nil _statsd_time('profile.ick.time.ickexchange') do raw_results = _eval( LUA_ICKEXCHANGE, ick_key, reserve_size, backwash ? 'backwash' : false, commit_members ) end _postprocess(raw_results,Skip0ThenFloatifyPairs) end |
#ickreserve(ick_key, max_size = 0, backwash: false) ⇒ Object
Tops up the consumer set up to max_size by shifting the lowest-scored elements from the producer set into the consumer set until the consumer set cardinality reaches max_size or the producer set is exhausted.
The reserved elements are meant to represent consumer work-in-progress. If they are not committed, they will be returned again in future calls to ickreserve.
Note that the Lua for ick is irritating like so:
- you add in the pattern [ score_number, member_string, ... ]
- you retrieve in the pattern [ member_string, score_string, ... ]
Native ZADD and ZRANGE WITHSCORES exhibit this same irritating inconsistency: Ick is annoyance-compatible with Redis sorted sets.
However, by analogy with redis-rb’s Redis.current.zrange(), this Ruby wrapper method pairs up the results for you, and converts the string scores to floats.
- you get from this method [[ member_string, score_number] , ... ]
with high scores are swapped out for pset members with lower scores. Otherwise cset members remain in the cset until committed regardless of how low scores in the pset might be.
Redis.current.zrange() withscores: [ member_string, score_number ] representing the lowest-scored elements from the producer set.
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/redis/ick.rb', line 228 def ickreserve(ick_key,max_size=0,backwash: false) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end if !max_size.is_a?(Integer) raise ArgumentError, "bogus non-Integer max_size #{max_size}" end if max_size < 0 raise ArgumentError, "bogus negative #{max_size}" end _statsd_increment('profile.ick.ickreserve.calls') _statsd_timing('profile.ick.ickreserve.max_size',max_size) raw_results = nil _statsd_time('profile.ick.time.ickreserve') do raw_results = _eval( LUA_ICKEXCHANGE, ick_key, max_size, backwash ? 'backwash' : false, ) end _postprocess(raw_results,Skip0ThenFloatifyPairs) end |
#ickstats(ick_key) ⇒ Object
Fetches stats.
about the Ick at ick_key, if any, else nil. If called within a pipeline, returns a redis::Future whose value is a Hash or nil as before.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/redis/ick.rb', line 105 def ickstats(ick_key) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end _statsd_increment('profile.ick.ickstats.calls') raw_results = nil _statsd_time('profile.ick.time.ickstats') do raw_results = _eval(LUA_ICKSTATS,ick_key) end _postprocess( raw_results, lambda do |results| return nil if !results # # LUA_ICKSTATS returned bulk data response [k,v,k,v,...] # stats = Hash[*results] # # From http://redis.io/commands/eval, the "Lua to Redis conversion # table" states that: # # Lua number -> Redis integer reply (the number is converted # into an integer) # # ...If you want to return a float from Lua you should return # it as a string. # # LUA_ICKSTATS works around this by converting certain stats to # strings. We reverse that conversion here. # stats.keys.select{|k|/_min$/ =~ k || /_max$/ =~ k}.each do |k| next if !stats[k] stats[k] = (/^\d+$/ =~ stats[k]) ? stats[k].to_i : stats[k].to_f end stats end ) end |