Class: Redis::Ick

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/ick.rb,
lib/redis/ick/version.rb

Overview

Binds Lua code to provide the Ick operations in Redis.

Constant Summary collapse

LUA_ICK_PREFIX =

LUA_ICK_PREFIX

A snippet of Lua code which is common to all the Ick scripts.

For convenience and to avoid repeating code, we set up some computed key names.

For safety, we check that the ick_ver, ick_pset, and ick_cset either do not exist or exit with the correct types and values to be identifiable as an Ick.

All scripts in the LUA_ICK series expect only one KEYS, the root key of the Ick data structure. We expect a version flag as a string at this key. Keys for other data are computed from KEYS in such a way as to guarantee they all hash to the same slot.

%{
  local ick_key        = KEYS[1]
  local ick_ver        = redis.call('GET',ick_key)
  local ick_pset_key   = ick_key .. '/ick/{' .. ick_key .. '}/pset'
  local ick_cset_key   = ick_key .. '/ick/{' .. ick_key .. '}/cset'
  local ick_ver_type   = redis.call('TYPE',ick_key).ok
  local ick_pset_type  = redis.call('TYPE',ick_pset_key).ok
  local ick_cset_type  = redis.call('TYPE',ick_cset_key).ok
  if (false ~= ick_ver and 'ick.v1' ~= ick_ver) then
    return redis.error_reply('unrecognized ick version ' .. ick_ver)
  end
  if ('none' ~= ick_ver_type and 'string' ~= ick_ver_type) then
    return redis.error_reply('ick defense: expected string at ' ..
                             ick_ver_key .. ', found ' .. ick_ver_type)
  end
  if ('none' ~= ick_pset_type and 'zset' ~= ick_pset_type) then
    return redis.error_reply('ick defense: expected string at ' ..
                             ick_pset_key .. ', found ' .. ick_pset_type)
  end
  if ('none' ~= ick_cset_type and 'zset' ~= ick_cset_type) then
    return redis.error_reply('ick defense: expected string at ' ..
                             ick_cset_key .. ', found ' .. ick_cset_type)
  end
  if ('none' == ick_ver_type) then
    if ('none' ~= ick_pset_type) then
      return redis.error_reply('ick defense: no ver at ' .. ick_ver_key ..
                               ', but found pset at ' .. ick_pset_key)
    end
    if ('none' ~= ick_cset_type) then
      return redis.error_reply('ick defense: no ver at ' .. ick_ver_key ..
                               ', but found cset at ' .. ick_cset_key)
    end
  end
}.freeze
LUA_ICKDEL =

LUA_ICKDEL

Removes all keys associated with the Ick at KEYS.

only if no Ick existed at KEYS

Returns:

  • the number of Redis keys deleted, which will be 0 if and

(LUA_ICK_PREFIX + %{
  return redis.call('DEL',ick_key,ick_pset_key,ick_cset_key)
}).freeze
LUA_ICKSTATS =

LUA_ICKSTATS

KEYS, or nil if none.

Note: At redis.io/commands/eval, the “Lua to Redis conversion table” stats:

Lua number -> Redis integer reply (the number is converted
into an integer)

...If you want to return a float from Lua you should return
it as a string.

We follow this recommendation in our Lua below where we convert our numeric responses to strings with “tostring(tonumber(n))”.

Returns:

  • a bulk data response with statistics about the Ick at

(LUA_ICK_PREFIX + %{
  if (false == ick_ver) then
    return nil
  end
  local ick_pset_size = redis.call('ZCARD',ick_pset_key)
  local ick_cset_size = redis.call('ZCARD',ick_cset_key)
  local ick_stats     = {
    'key',        ick_key,
    'keys',       { ick_key, ick_pset_key, ick_cset_key },
    'ver',        ick_ver,
    'cset_size',  ick_cset_size,
    'pset_size',  ick_pset_size,
    'total_size', ick_cset_size + ick_pset_size,
  }
  local pset_min = nil
  local pset_max = nil
  if ick_pset_size > 0 then
    pset_min = redis.call('ZRANGE',ick_pset_key, 0, 0,'WITHSCORES')[2]
    table.insert(ick_stats, 'pset_min')
    table.insert(ick_stats, tostring(tonumber(pset_min)))
    pset_max = redis.call('ZRANGE',ick_pset_key,-1,-1,'WITHSCORES')[2]
    table.insert(ick_stats, 'pset_max')
    table.insert(ick_stats, tostring(tonumber(pset_max)))
  end
  local cset_min = nil
  local cset_max = nil
  if ick_cset_size > 0 then
    cset_min = redis.call('ZRANGE',ick_cset_key, 0, 0,'WITHSCORES')[2]
    table.insert(ick_stats, 'cset_min')
    table.insert(ick_stats, tostring(tonumber(cset_min)))
    cset_max = redis.call('ZRANGE',ick_cset_key,-1,-1,'WITHSCORES')[2]
    table.insert(ick_stats, 'cset_max')
    table.insert(ick_stats, tostring(tonumber(cset_max)))
  end
  local total_min = nil
  if pset_min and cset_min then
    total_min = math.min(cset_min,pset_min)
  elseif pset_min then
    total_min = pset_min
  elseif cset_min then
    total_min = cset_min
  end
  if total_min then
    table.insert(ick_stats, 'total_min')
    table.insert(ick_stats, tostring(tonumber(total_min)))
  end
  local total_max = nil
  if pset_max and cset_max then
    total_max = math.max(cset_max,pset_max)
  elseif pset_max then
    total_max = pset_max
  elseif cset_max then
    total_max = cset_max
  end
  if total_max then
    table.insert(ick_stats, 'total_max')
    table.insert(ick_stats, tostring(tonumber(total_max)))
  end
  return ick_stats
}).freeze
LUA_ICKADD =

LUA_ICKADD

Adds members to the cset as per ZADD. Where a member is re-written, we always take the lowest score.

Thus, scores are only allowed to move downward. changes to score.

Creates the Ick if necessary.

Returns:

  • a pair of numbers [num_new, num_changed]

(LUA_ICK_PREFIX + %{
  local num_args    = table.getn(ARGV)
  if 1 == (num_args % 2) then
    return redis.error_reply("odd number of arguments for 'ickadd' command")
  end
  local num_new     = 0
  local num_changed = 0
  for i = 1,num_args,2 do
    local score     = tonumber(ARGV[i])
    local member    = ARGV[i+1]
    local old_score = redis.call('ZSCORE',ick_pset_key,member)
    if false == old_score then
      redis.call('ZADD',ick_pset_key,score,member)
      num_new       = num_new + 1
    elseif score < tonumber(old_score) then
      redis.call('ZADD',ick_pset_key,score,member)
      num_changed   = num_changed + 1
    end
  end
  redis.call('SETNX', ick_key, 'ick.v1')
  return { num_new, num_changed }
}).freeze
LUA_ICKRESERVE =

LUA_ICKRESERVE

Tops up the cset to up to size ARGV by shifting the lowest-scored members over from the pset.

The cset might already be full, in which case we may shift fewer than ARGV elements.

The same score-folding happens as per ICKADD. Thus where there are duplicate messages, we may remove more members from the pset than we add to the cset.

size for cset and to be returned

Returns:

  • a bulk response, up to ARGV pairs [member,score,…]

(LUA_ICK_PREFIX + %{
  local target_cset_size = tonumber(ARGV[1])
  while true do
    local ick_cset_size  = redis.call('ZCARD',ick_cset_key)
    if ick_cset_size and target_cset_size <= ick_cset_size then
      break
    end
    local first_in_pset  = 
      redis.call('ZRANGE',ick_pset_key,0,0,'WITHSCORES')
    if 0 == table.getn(first_in_pset) then
      break
    end
    local first_member   = first_in_pset[1]
    local first_score    = tonumber(first_in_pset[2])
    redis.call('ZREM',ick_pset_key,first_member)
    local old_score      = redis.call('ZSCORE',ick_cset_key,first_member)
    if false == old_score or first_score < tonumber(old_score) then
      redis.call('ZADD',ick_cset_key,first_score,first_member)
    end
  end
  redis.call('SETNX', ick_key, 'ick.v1')
  if target_cset_size <= 0 then
    return {}
  else
    local max            = target_cset_size - 1
    return redis.call('ZRANGE',ick_cset_key,0,max,'WITHSCORES')
  end
}).freeze
LUA_ICKCOMMIT =

LUA_ICKCOMMIT

Removes specified members from the pset.

Note: This this Lua unpacks ARGV with the iterator ipairs() instead of unpack() to avoid a “too many results to unpack” failure at 8000 args. However, the loop over many redis.call is regrettably heavy-weight. From a performance standpoint it would be preferable to call ZREM in larger batches.

Returns:

  • the number of members removed

(LUA_ICK_PREFIX + %{
  redis.call('SETNX', ick_key, 'ick.v1')
  if 0 == table.getn(ARGV) then
    return 0
  end
  local num_removed = 0
  for i,v in ipairs(ARGV) do
    num_removed = num_removed + redis.call('ZREM',ick_cset_key,v)
  end
  return num_removed
}).freeze
VERSION =

Version plan/history:

0.0.1 - Still in Prosperworks/ALI/vendor/gems/redis-ick.

0.0.2 - Broke out into Prosperworks/redis-ick, make public.

0.0.3 - Got .travis.yml working with a live redis-server.

Runtime dependency on redis-script_manager for
Ick._eval.

Initial Rubocop integration.

Misc cleanup.

0.0.4 - Imported text from original design doc to README.md, polish.

Rubocop polish and defiance.

Development dependency on redis-key_hash to test
prescriptive hash claims.  Identified limits of
prescriptive hash robustness.

0.1.0 - (future) Big README.md and Rdoc update, solicit feedback

from select external beta users.

0.2.0 - (future) Incorporate feedback, announce.

'0.0.4'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis, statsd: nil) ⇒ Ick

Creates an Ick accessor.

to :increment and :timing.

Parameters:

  • redis

    Redis

  • statsd (defaults to: nil)

    a stats proxy. May be nil, else expected to respond



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/redis/ick.rb', line 19

def initialize(redis, statsd: nil)
  if !redis.is_a?(Redis)
    raise ArgumentError, "not a Redis: #{redis}"
  end
  if statsd
    if !statsd.respond_to?(:increment)
      raise ArgumentError, 'no statsd.increment'
    end
    if !statsd.respond_to?(:timing)
      raise ArgumentError, 'no statsd.timeing'
    end
    if !statsd.respond_to?(:time)
      raise ArgumentError, 'no statsd.time'
    end
  end
  @redis  = redis
  @statsd = statsd
end

Instance Attribute Details

#redisObject

Returns the value of attribute redis.



38
39
40
# File 'lib/redis/ick.rb', line 38

def redis
  @redis
end

#statsdObject

Returns the value of attribute statsd.



39
40
41
# File 'lib/redis/ick.rb', line 39

def statsd
  @statsd
end

Class Method Details

._floatify(str) ⇒ Object

Converts a string str into a Float, and recognizes ‘inf’, ‘-inf’, etc.

So we can be certain of compatibility, this was stolen with tweaks from github.com/redis/redis-rb/blob/master/lib/redis.rb.

Raises:

  • (ArgumentError)


315
316
317
318
319
320
321
322
# File 'lib/redis/ick.rb', line 315

def self._floatify(str)
  raise ArgumentError, "not String: #{str}" if !str.is_a?(String)
  if (inf = str.match(/^(-)?inf/i))
    (inf[1] ? -1.0 : 1.0) / 0.0
  else
    Float(str)
  end
end

._postprocess_ickstats_results(raw_ickstats_results) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/redis/ick.rb', line 131

def self._postprocess_ickstats_results(raw_ickstats_results)
  return nil if !raw_ickstats_results
  #
  # LUA_ICKSTATS returned bulk data response [k,v,k,v,...]
  #
  stats = Hash[*raw_ickstats_results]
  #
  # From http://redis.io/commands/eval, the "Lua to Redis conversion
  # table" states that:
  #
  #   Lua number -> Redis integer reply (the number is converted
  #   into an integer)
  #
  #   ...If you want to return a float from Lua you should return
  #   it as a string.
  #
  # LUA_ICKSTATS works around this by converting certain stats to
  # strings.  We reverse that conversion here.
  #
  stats.keys.select{|k|/_min$/ =~ k || /_max$/ =~ k}.each do |k|
    next if !stats[k]
    stats[k] = (/^\d+$/ =~ stats[k]) ? stats[k].to_i : stats[k].to_f
  end
  stats
end

Instance Method Details

#_eval(lua, ick_key, *args) ⇒ Object

Runs the specified lua in the redis against the specifified Ick.



326
327
328
329
330
331
332
333
334
# File 'lib/redis/ick.rb', line 326

def _eval(lua,ick_key,*args)
  if !lua.is_a?(String)
    raise ArgumentError, "bogus non-String lua #{lua}"
  end
  if !ick_key.is_a?(String)
    raise ArgumentError, "bogus non-String ick_key #{ick_key}"
  end
  Redis::ScriptManager.eval_gently(redis,lua,[ick_key],args)
end

#_statsd_increment(metric) ⇒ Object

Reports a single count on the requested metric to statsd (if any).

Parameters:

  • metric

    String



45
46
47
# File 'lib/redis/ick.rb', line 45

def _statsd_increment(metric)
  statsd.increment(metric) if statsd
end

#_statsd_time(metric) ⇒ Object

Executes the block (if any) and reports its timing in milliseconds on the requested metric to statsd (if any).

Parameters:

  • metric

    String

Returns:

  • the value of the block, or nil if none



65
66
67
68
69
70
71
72
73
# File 'lib/redis/ick.rb', line 65

def _statsd_time(metric)
  if statsd
    statsd.time(metric) do
      block_given? ? yield : nil
    end
  else
    block_given? ? yield : nil
  end
end

#_statsd_timing(metric, time) ⇒ Object

Reports the specified timing on the requested metric to statsd (if any).

Parameters:

  • metric

    String



54
55
56
# File 'lib/redis/ick.rb', line 54

def _statsd_timing(metric,time)
  statsd.timing(metric,time) if statsd
end

#ickadd(ick_key, *score_member_pairs) ⇒ Object

Adds all the specified members with the specified scores to the Ick stored at key.

Entries are stored in order by score. Lower-scored entries will pop out in reserve before higher-scored entries. Re-adding an entry which already exists in the producer set with a new score results in the entry having the lowest of the old and new scores.

Similar to redis.io/commands/zadd with a modified NX option, operating on the producer set.

Usage:

ick.ickadd(ick_key,score,member[,score,member]*)

Suggested usage is for scores to be a Unix timestamp indicating when something became dirty.

changed scores.

Parameters:

  • ick_key

    String the base key for the Ick

  • score_member_pairs

    Array of Arrays of [score,message]

Returns:

  • a pair, the number of new values followed by the numer of



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/redis/ick.rb', line 182

def ickadd(ick_key,*score_member_pairs)
  if !ick_key.is_a?(String)
    raise ArgumentError, "bogus non-String ick_key #{ick_key}"
  end
  if score_member_pairs.size.odd?
    raise ArgumentError, "bogus odd-numbered #{score_member_pairs}"
  end
  score_member_pairs.each_slice(2) do |slice|
    score, member = slice
    if ! score.is_a? Numeric
      raise ArgumentError, "bogus non-Numeric score #{score}"
    end
    if ! member.is_a? String
      raise ArgumentError, "bogus non-String member #{member}"
    end
  end
  _statsd_increment('profile.ick.ickadd.calls')
  _statsd_timing('profile.ick.ickadd.pairs',score_member_pairs.size / 2)
  _statsd_time('profile.ick.time.ickadd') do
    _eval(LUA_ICKADD,ick_key,*score_member_pairs)
  end
end

#ickcommit(ick_key, *members) ⇒ Object

Removes the indicated members from the producer set, if present.

Similar to ZREM ick_key [member]*, per redis.io/commands/zrem, operating on the consumer set only.

Usage:

ick.ickcommit(ick_key,memberA,memberB,...)

Committed elements are meant to represent consumer work-completed.

producer set, not including non existing members.

Parameters:

  • ick_key

    String the base key for the Ick

  • members

    members to be committed out pf the pset

Returns:

  • an integer, the number of members removed from the



298
299
300
301
302
303
304
305
306
307
# File 'lib/redis/ick.rb', line 298

def ickcommit(ick_key,*members)
  if !ick_key.is_a?(String)
    raise ArgumentError, "bogus non-String ick_key #{ick_key}"
  end
  _statsd_increment('profile.ick.ickcommit.calls')
  _statsd_timing('profile.ick.ickcommit.members',members.size)
  _statsd_time('profile.ick.time.ickcommit') do
    _eval(LUA_ICKCOMMIT,ick_key,*members)
  end
end

#ickdel(ick_key) ⇒ Object

Removes all data associated with the Ick in Redis at key.

Similar to DEL key, redis.io/commands/del, but may delete multiple keys which together implement the Ick data structure.

be >= 1 if an Ick existed at key.

Parameters:

  • ick_key

    String the base key for the Ick

Returns:

  • an integer, the number of Redis keys deleted, which will



86
87
88
89
90
91
92
93
94
# File 'lib/redis/ick.rb', line 86

def ickdel(ick_key)
  if !ick_key.is_a?(String)
    raise ArgumentError, "bogus non-String ick_key #{ick_key}"
  end
  _statsd_increment('profile.ick.ickdel.calls')
  _statsd_time('profile.ick.ickdel.time') do
    _eval(LUA_ICKDEL,ick_key)
  end
end

#ickreserve(ick_key, max_size = 0) ⇒ Object

Tops up the consumer set up to max_size by shifting the lowest-scored elements from the producer set into the consumer set until the consumer set cardinality reaches max_size or the producer set is exhausted.

The reserved elements are meant to represent consumer work-in-progress. If they are not committed, they will be returned again in future calls to ickreserve.

Note that the Lua for ick is irritating like so:

- you add in the pattern      [ score_number,  member_string, ... ]
- you retrieve in the pattern [ member_string, score_string, ... ]

Native ZADD and ZRANGE WITHSCORES exhibit this same irritating inconsistency: Ick is annoyance-compatible with Redis sorted sets.

However, by analogy with redis-rb’s Redis.current.zrange(), this Ruby wrapper method pairs up the results for you, and converts the string scores to floats.

- you get from this method    [[ member_string, score_number] , ... ]

Redis.current.zrange() withscores: [ member_string, score_number ] representing the lowest-scored elements from the producer set.

Parameters:

  • ick_key

    String the base key for the Ick

  • max_size (defaults to: 0)

    max number of messages to reserve

Returns:

  • a list of up to max_size pairs, similar to



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/redis/ick.rb', line 236

def ickreserve(ick_key,max_size=0)
  if !ick_key.is_a?(String)
    raise ArgumentError, "bogus non-String ick_key #{ick_key}"
  end
  if !max_size.is_a?(Integer)
    raise ArgumentError, "bogus non-Integer max_size #{max_size}"
  end
  if max_size < 0
    raise ArgumentError, "bogus negative #{max_size}"
  end
  _statsd_increment('profile.ick.ickreserve.calls')
  _statsd_timing('profile.ick.ickreserve.max_size',max_size)
  raw_ickreserve_results = nil
  _statsd_time('profile.ick.time.ickreserve') do
    raw_ickreserve_results =
      _eval(
        LUA_ICKRESERVE,
        ick_key,
        max_size
      )
  end
  if raw_ickreserve_results.is_a?(Redis::Future)
    #
    # We extend the Redis::Future with a continuation so we can
    # add our own post-processing.
    #
    class << raw_ickreserve_results
      alias_method :original_value, :value
      def value
        original_value.each_slice(2).map do |p|
          [ p[0], ::Redis::Ick._floatify(p[1]) ]
        end
      end
    end
    raw_ickreserve_results
  else
    results = raw_ickreserve_results.each_slice(2).map do |p|
      [ p[0], ::Redis::Ick._floatify(p[1]) ]
    end
    _statsd_timing('profile.ick.ickreserve.num_results',results.size)
    results
  end
end

#ickstats(ick_key) ⇒ Object

Fetches stats.

about the Ick at ick_key, if any, else nil. If called within a pipeline, returns a redis::Future whose value is a Hash or nil as before.

Parameters:

  • ick_key

    String the base key for the Ick

Returns:

  • If called outside a Redis pipeline, a Hash with stats



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/redis/ick.rb', line 105

def ickstats(ick_key)
  if !ick_key.is_a?(String)
    raise ArgumentError, "bogus non-String ick_key #{ick_key}"
  end
  _statsd_increment('profile.ick.ickstats.calls')
  raw_ickstats_results = nil
  _statsd_time('profile.ick.time.ickstats') do
    raw_ickstats_results = _eval(LUA_ICKSTATS,ick_key)
  end
  if raw_ickstats_results.is_a?(Redis::Future)
    #
    # We extend the Redis::Future with a continuation so we can add
    # our own post-processing.
    #
    class << raw_ickstats_results
      alias_method :original_value, :value
      def value
        ::Redis::Ick._postprocess_ickstats_results(original_value)
      end
    end
    raw_ickstats_results
  else
    ::Redis::Ick._postprocess_ickstats_results(raw_ickstats_results)
  end
end