Class: Queris::Query
- Inherits:
-
Object
- Object
- Queris::Query
- Defined in:
- lib/queris/query.rb,
lib/queris/query/page.rb,
lib/queris/query/timer.rb,
lib/queris/query/trace.rb,
lib/queris/query/operations.rb
Direct Known Subclasses
Defined Under Namespace
Classes: DiffOp, Error, IntersectOp, Op, Page, SortOp, Timer, Trace, UnionOp
Constant Summary collapse
- MINIMUM_QUERY_TTL =
seconds. Don’t mess with this number unless you fully understand it, setting it too small may lead to subquery race conditions
30
Instance Attribute Summary collapse
-
#created_at ⇒ Object
Returns the value of attribute created_at.
-
#model ⇒ Object
Returns the value of attribute model.
-
#ops ⇒ Object
Returns the value of attribute ops.
-
#params ⇒ Object
Returns the value of attribute params.
-
#redis_prefix ⇒ Object
Returns the value of attribute redis_prefix.
-
#run_id ⇒ Object
Returns the value of attribute run_id.
-
#sort_ops ⇒ Object
Returns the value of attribute sort_ops.
-
#subqueries ⇒ Object
readonly
Returns the value of attribute subqueries.
-
#temp_key_ttl ⇒ Object
readonly
Returns the value of attribute temp_key_ttl.
-
#timer ⇒ Object
Returns the value of attribute timer.
-
#ttl ⇒ Object
Returns the value of attribute ttl.
Instance Method Summary collapse
- #add_temp_key(k) ⇒ Object
- #all_index_keys ⇒ Object
-
#all_indices ⇒ Object
list all indices (including subqueries).
- #all_live_indices ⇒ Object
-
#all_query_keys ⇒ Object
all keys related to a query.
- #all_subqueries ⇒ Object
- #count(opt = {}) ⇒ Object (also: #size, #length)
- #delta(arg = {}) ⇒ Object
- #diff(index, val = nil) ⇒ Object (also: #∖)
-
#each_operand(which_ops = nil) ⇒ Object
walk though all query operands.
-
#each_subquery(recursive = true) ⇒ Object
dependency-ordered.
- #explain(opt = {}) ⇒ Object
- #first_result ⇒ Object
-
#flush(arg = {}) ⇒ Object
(also: #clear)
recursively and conditionally flush query and subqueries arg parameters: flush query if: :ttl - query.ttl <= ttl :index (symbol or index or an array of them) - query uses th(is|ese) ind(ex|ices) or flush conditionally according to passed block: flush {|query| true } when no parameters or block present, flush only this query and no subqueries.
- #gather_ready_data(r) ⇒ Object
- #id ⇒ Object
-
#indices(opt = {}) ⇒ Object
list all indices used by a query (no subqueries, unless asked for).
- #info(opt = {}) ⇒ Object
-
#initialize(model, arg = nil, &block) ⇒ Query
constructor
A new instance of Query.
- #intersect(index, val = nil) ⇒ Object (also: #∩)
- #json_redis_dump ⇒ Object
- #key(arg = nil) ⇒ Object (also: #key_for_query)
-
#key_size(redis_key = nil, r = nil) ⇒ Object
current query size.
-
#live! ⇒ Object
live queries have pending updates stored nearby.
- #live=(val) ⇒ Object
- #live? ⇒ Boolean
- #marshal_dump ⇒ Object
- #marshal_load(data) ⇒ Object
- #marshaled ⇒ Object
- #member?(id) ⇒ Boolean (also: #contains?)
- #no_optimize! ⇒ Object
- #pageable! ⇒ Object
- #pageable? ⇒ Boolean
- #paged? ⇒ Boolean
-
#param(param_name) ⇒ Object
retrieve query parameters, as fed through union and intersect and diff.
- #profiler ⇒ Object
- #query_run_stage_begin(r, q) ⇒ Object
- #query_run_stage_inspect(r, q) ⇒ Object
- #query_run_stage_prepare(r, q) ⇒ Object
- #query_run_stage_release(r, q) ⇒ Object
- #query_run_stage_reserve(r, q) ⇒ Object
- #query_run_stage_run(r, q) ⇒ Object
- #range(range) ⇒ Object
- #raw_results(*arg) ⇒ Object
- #ready?(r = nil, subs = true) ⇒ Boolean
-
#realtime! ⇒ Object
realtime queries are updated automatically, on the spot.
- #realtime? ⇒ Boolean
- #redis ⇒ Object
-
#resort ⇒ Object
apply a sort to set of existing results.
- #result(n = 0) ⇒ Object
- #result_score(id) ⇒ Object
- #result_scores(*ids) ⇒ Object
-
#results(*arg) ⇒ Object
flexible query results retriever results(x..y) from x to y results(x, y) same results(x) first x results results(x..y, :reverse) range in reverse results(x..y, :score =>a..b) results from x to y with scores in given score range results(x..y, :with_scores) return results with result.query_score attr set to the score results(x..y, :replace => [:foo_id, FooModel]) like an SQL join.
- #results_key(suffix = nil, raw_id = nil) ⇒ Object
- #run(opt = {}) ⇒ Object
- #run_callbacks(event, with_subqueries = true) ⇒ Object
- #run_pipeline(redis, *stages) ⇒ Object
- #run_stage(stage, r, recurse = true) {|r| ... } ⇒ Object
- #runstate_key(sub = nil) ⇒ Object
- #runstate_keys ⇒ Object
- #sort(index, reverse = nil) ⇒ Object
-
#sort_mult ⇒ Object
sort multiplier (direction) – currently, +1 or -1.
-
#sort_score(obj, arg = {}) ⇒ Object
get an object’s sorting score, or its previous sorting score if asked for.
-
#sortby(index, direction = 1) ⇒ Object
SortOp::SYMBOL.
- #sorting_by ⇒ Object
- #sorting_by?(index) ⇒ Boolean
- #static! ⇒ Object
-
#static? ⇒ Boolean
static queries are updated only after they expire.
- #structure ⇒ Object
- #subquery(arg = {}) ⇒ Object
- #subquery_id(subquery) ⇒ Object
- #temp_keys ⇒ Object
- #to_s ⇒ Object
- #trace(opt = {}) ⇒ Object
- #trace!(opt = {}) ⇒ Object
- #trace? ⇒ Boolean
-
#undo(num_operations = 1) ⇒ Object
undo the last n operations.
-
#union(index, val = nil) ⇒ Object
(also: #∪)
the set operations.
- #unpageable! ⇒ Object
-
#update(obj, arg = {}) ⇒ Object
update query results with object(s).
- #usable_as_results?(*arg) ⇒ Boolean
- #use_page(page) ⇒ Object
-
#use_redis(redis_instance) ⇒ Object
seems obsolete with the new run pipeline.
- #uses_index?(*arg) ⇒ Boolean
- #uses_index_as_results_key? ⇒ Boolean
-
#volatile_query_keys ⇒ Object
a list of all keys that need to be refreshed (ttl extended) for a query.
Constructor Details
#initialize(model, arg = nil, &block) ⇒ Query
Returns a new instance of Query.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/queris/query.rb', line 18 def initialize(model, arg=nil, &block) if model.kind_of?(Hash) and arg.nil? arg, model = model, model[:model] elsif arg.nil? arg= {} end raise ArgumentError, "Can't create query without a model" unless model raise Error, "include Queris in your model (#{model.inspect})." unless model.include? Queris @model = model @params = {} @ops = [] @sort_ops = [] @used_index = {} @redis_prefix = arg[:complete_prefix] || ((arg[:prefix] || arg[:redis_prefix] || model.prefix) + self.class.name.split('::').last + ":") @redis=arg[:redis] @profile = arg[:profiler] || model.query_profiler.new(nil, :redis => @redis || model.redis) @subqueries = [] self.ttl=arg[:ttl] || 600 #10 minutes default time-to-live @temp_key_ttl = arg[:temp_key_ttl] || 300 @trace=nil @pageable = arg[:pageable] || arg[:paged] live! if arg[:live] realtime! if arg[:realtime] @created_at = Time.now.utc if @expire_after = (arg[:expire_at] || arg[:expire] || arg[:expire_after]) raise ArgumentError, "Can't create query with expire_at option and check_staleness options at once" if arg[:check_staleness] raise ArgumentError, "Can't create query with expire_at option with track_stats disabled" if arg[:track_stats]==false arg[:track_stats]=true arg[:check_staleness] = Proc.new do |query| query.time_cached < (@expire_after || Time.at(0)) end end @from_hash = arg[:from_hash] @restore_failed_callback = arg[:restore_failed] @delete_missing = arg[:delete_missing] @track_stats = arg[:track_stats] @check_staleness = arg[:check_staleness] if block_given? instance_eval(&block) end self end |
Instance Attribute Details
#created_at ⇒ Object
Returns the value of attribute created_at.
16 17 18 |
# File 'lib/queris/query.rb', line 16 def created_at @created_at end |
#model ⇒ Object
Returns the value of attribute model.
16 17 18 |
# File 'lib/queris/query.rb', line 16 def model @model end |
#ops ⇒ Object
Returns the value of attribute ops.
16 17 18 |
# File 'lib/queris/query.rb', line 16 def ops @ops end |
#params ⇒ Object
Returns the value of attribute params.
16 17 18 |
# File 'lib/queris/query.rb', line 16 def params @params end |
#redis_prefix ⇒ Object
Returns the value of attribute redis_prefix.
16 17 18 |
# File 'lib/queris/query.rb', line 16 def redis_prefix @redis_prefix end |
#run_id ⇒ Object
Returns the value of attribute run_id.
977 978 979 |
# File 'lib/queris/query.rb', line 977 def run_id @run_id end |
#sort_ops ⇒ Object
Returns the value of attribute sort_ops.
16 17 18 |
# File 'lib/queris/query.rb', line 16 def sort_ops @sort_ops end |
#subqueries ⇒ Object (readonly)
Returns the value of attribute subqueries.
17 18 19 |
# File 'lib/queris/query.rb', line 17 def subqueries @subqueries end |
#temp_key_ttl ⇒ Object (readonly)
Returns the value of attribute temp_key_ttl.
17 18 19 |
# File 'lib/queris/query.rb', line 17 def temp_key_ttl @temp_key_ttl end |
#timer ⇒ Object
Returns the value of attribute timer.
967 968 969 |
# File 'lib/queris/query.rb', line 967 def timer @timer end |
#ttl ⇒ Object
Returns the value of attribute ttl.
17 18 19 |
# File 'lib/queris/query.rb', line 17 def ttl @ttl end |
Instance Method Details
#add_temp_key(k) ⇒ Object
316 317 318 319 |
# File 'lib/queris/query.rb', line 316 def add_temp_key(k) @temp_keys ||= {} @temp_keys[k]=true end |
#all_index_keys ⇒ Object
958 959 960 961 962 963 964 965 |
# File 'lib/queris/query.rb', line 958 def all_index_keys keys = [] [ops, sort_ops].each do |ops| ops.each { |op| keys.concat op.keys(nil, true) } end keys << key keys.uniq end |
#all_indices ⇒ Object
list all indices (including subqueries)
454 455 456 |
# File 'lib/queris/query.rb', line 454 def all_indices indices :subqueries => true end |
#all_live_indices ⇒ Object
457 458 459 460 461 462 463 464 |
# File 'lib/queris/query.rb', line 457 def all_live_indices return [] unless live? ret = all_indices ret.select! do |i| (ForeignIndex === i ? i.real_index : i).live? end ret end |
#all_query_keys ⇒ Object
all keys related to a query
326 327 328 329 330 |
# File 'lib/queris/query.rb', line 326 def all_query_keys all = volatile_query_keys all |= temp_keys all end |
#all_subqueries ⇒ Object
388 389 390 391 392 |
# File 'lib/queris/query.rb', line 388 def all_subqueries ret = subqueries.dup subqueries.each { |sub| ret.concat sub.all_subqueries } ret end |
#count(opt = {}) ⇒ Object Also known as: size, length
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 |
# File 'lib/queris/query.rb', line 748 def count(opt={}) use_page nil run(:no_update => !realtime?) unless opt [:no_run] key = results_key case redis.type(key) when 'set' raise Error, "Query results are not a sorted set (maybe using a set index directly), can't range" if opt[:score] redis.scard key when 'zset' if opt[:score] range = opt[:score] raise ArgumentError, ":score option must be a Range, but it's a #{range.class} instead" unless Range === range first = range.begin * sort_mult last = range.exclude_end? ? "(#{range.end.to_f * sort_mult}" : range.end.to_f * sort_mult #) first, last = last, first if sort_mult == -1 redis.zcount(key, first, last) else redis.zcard key end else #not a set. 0 end end |
#delta(arg = {}) ⇒ Object
287 288 289 290 |
# File 'lib/queris/query.rb', line 287 def delta(arg={}) myredis = arg[:redis] || redis_master || redis myredis.zrange results_key(:delta), 0, -1 end |
#diff(index, val = nil) ⇒ Object Also known as: ∖
94 95 96 |
# File 'lib/queris/query.rb', line 94 def diff(index, val=nil) prepare_op DiffOp, index, val end |
#each_operand(which_ops = nil) ⇒ Object
walk though all query operands
950 951 952 953 954 955 956 |
# File 'lib/queris/query.rb', line 950 def each_operand(which_ops=nil) #walk though all query operands (which_ops == :sort ? sort_ops : ops).each do |operation| operation.operands.each do |operand| yield operand, operation end end end |
#each_subquery(recursive = true) ⇒ Object
dependency-ordered
394 395 396 397 398 399 400 401 |
# File 'lib/queris/query.rb', line 394 def each_subquery(recursive=true) #dependency-ordered subqueries.each do |s| s.each_subquery do |ss| yield ss end yield s end end |
#explain(opt = {}) ⇒ Object
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 |
# File 'lib/queris/query.rb', line 803 def explain(opt={}) return "(∅)" if ops.nil? || ops.empty? first_op = ops.first r = ops.map do |op| operands = op.operands.map do |o| if Query === o.index if opt[:subqueries] != false o.index.explain opt else "{subquery #{subquery_id o.index}}" end else value = case opt[:serialize] when :json JSON.dump o.value when :ruby Marshal.dump o.value else #human-readable and sufficiently unique o.value.to_s end "#{o.index.name}#{(value.empty? || opt[:structure])? nil : "<#{value}>"}" end end op_str = operands.join " #{op.symbol} " if first_op == op op_str.insert 0, "∅ #{op.symbol} " if DiffOp === op else op_str.insert 0, " #{op.symbol} " end op_str end "(#{r.join})" end |
#first_result ⇒ Object
714 715 716 |
# File 'lib/queris/query.rb', line 714 def first_result return result 0 end |
#flush(arg = {}) ⇒ Object Also known as: clear
recursively and conditionally flush query and subqueries arg parameters: flush query if:
:ttl - query.ttl <= ttl
:index (symbol or index or an array of them) - query uses th(is|ese) ind(ex|ices)
or flush conditionally according to passed block: flush {|query| true } when no parameters or block present, flush only this query and no subqueries
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 |
# File 'lib/queris/query.rb', line 472 def flush(arg={}) model.run_query_callbacks :before_flush, self return 0 if uses_index_as_results_key? flushed = 0 if block_given? #efficiency hackety hack - anonymous blocs are heaps faster than bound ones subqueries.each { |sub| flushed += sub.flush arg, &Proc.new } elsif arg.count>0 subqueries.each { |sub| flushed += sub.flush arg } end if flushed > 0 || arg.count==0 || ttl <= (arg[:ttl] || 0) || (uses_index?(*arg[:index])) || block_given? && (yield self) #this only works because of the slave EXPIRE hack requiring dummy query results_keys on master. #otherwise, we'd have to create the key first (in a MULTI, of course) n_deleted = 0 (redis_master || redis).multi do |r| all = all_query_keys all.each do |k| r.setnx k, 'baleted' end n_deleted = r.del all end @known_to_exist = nil flushed += n_deleted.value end flushed end |
#gather_ready_data(r) ⇒ Object
1122 1123 1124 1125 1126 |
# File 'lib/queris/query.rb', line 1122 def gather_ready_data(r) unless paged? @ready = Queris.run_script :unpaged_query_ready, r, [ results_key, results_key(:exists), runstate_key(:ready) ] end end |
#id ⇒ Object
744 745 746 |
# File 'lib/queris/query.rb', line 744 def id Queris.digest results_key end |
#indices(opt = {}) ⇒ Object
list all indices used by a query (no subqueries, unless asked for)
440 441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/queris/query.rb', line 440 def indices(opt={}) ret = @used_index.dup if opt[:subqueries] subqueries.each do |sub| ret.merge! sub.indices(subqueries: true, hash: true) end end if opt[:hash] ret else ret.values end end |
#info(opt = {}) ⇒ Object
844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 |
# File 'lib/queris/query.rb', line 844 def info(opt={}) ind = opt[:indent] || "" info = "#{ind}#{self} info:\r\n" info << "#{ind}key: #{results_key}\r\n" unless opt[:no_key] info << "#{ind}redis key type:#{redis.type key}, size: #{count :no_run => true}\r\n" unless opt[:no_size] info << "#{ind}liveliness:#{live? ? (realtime? ? 'realtime' : 'live') : 'static'}" if live? #live_keyscores = redis.zrange(results_key(:live), 0, -1, :with_scores => true) #info << live_keyscores.map{|i,v| "#{i}:#{v}"}.join(", ") info << " (live indices: #{redis.zcard results_key(:live)})" end info << "\r\n" info << "#{ind}id: #{id}, ttl: #{ttl}, sort: #{sorting_by || "none"}\r\n" unless opt[:no_details] info << "#{ind}remaining ttl: results: #{redis.pttl(results_key)}ms, existence flag: #{redis.pttl results_key(:exists)}ms, marshaled: #{redis.pttl results_key(:marshaled)}ms\r\n" if opt[:debug_ttls] unless @subqueries.empty? || opt[:no_subqueries] info << "#{ind}subqueries:\r\n" @subqueries.each do |sub| info << sub.info(opt.merge(:indent => ind + " ", :output=>false)) end end opt[:output]!=false ? puts(info) : info end |
#intersect(index, val = nil) ⇒ Object Also known as: ∩
89 90 91 |
# File 'lib/queris/query.rb', line 89 def intersect(index, val=nil) prepare_op IntersectOp, index, val end |
#json_redis_dump ⇒ Object
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 |
# File 'lib/queris/query.rb', line 898 def json_redis_dump queryops = [] ops.each {|op| queryops.concat(op.json_redis_dump)} queryops.reverse! sortops = [] sort_ops.each{|op| sortops.concat(op.json_redis_dump)} ret = { key: results_key, live: live?, realtime: realtime?, ops_reverse: queryops, sort_ops: sortops } ret end |
#key(arg = nil) ⇒ Object Also known as: key_for_query
739 740 741 |
# File 'lib/queris/query.rb', line 739 def key arg=nil results_key end |
#key_size(redis_key = nil, r = nil) ⇒ Object
current query size
775 776 777 |
# File 'lib/queris/query.rb', line 775 def key_size(redis_key=nil, r=nil) Queris.run_script :multisize, (r || redis), [redis_key || key] end |
#live! ⇒ Object
live queries have pending updates stored nearby
249 |
# File 'lib/queris/query.rb', line 249 def live!; @live=true; @realtime=false; validate_ttl; self; end |
#live=(val) ⇒ Object
242 |
# File 'lib/queris/query.rb', line 242 def live=(val); @live= val; end |
#live? ⇒ Boolean
241 |
# File 'lib/queris/query.rb', line 241 def live?; @live; end |
#marshal_dump ⇒ Object
867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 |
# File 'lib/queris/query.rb', line 867 def marshal_dump subs = {} @subqueries.each { |sub| subs[sub.id.to_sym]=sub.marshal_dump } unique_params = params.dup each_operand do |op| unless Query === op.index param_name = op.index.name unique_params.delete param_name if params[param_name] == op.value end end { model: model.name.to_sym, ops: ops.map{|op| op.marshal_dump}, sort_ops: sort_ops.map{|op| op.marshal_dump}, subqueries: subs, params: unique_params, args: { complete_prefix: redis_prefix, ttl: ttl, expire_after: @expire_after, track_stats: @track_stats, live: @live, realtime: @realtime, from_hash: @from_hash, delete_missing: @delete_missing, pageable: pageable? } } end |
#marshal_load(data) ⇒ Object
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 |
# File 'lib/queris/query.rb', line 914 def marshal_load(data) if Hash === data initialize Queris.model(data[:model]), data[:args] subqueries = {} data[:subqueries].map do |id, sub| q = Query.allocate q.marshal_load sub subqueries[id]=q end [ data[:ops], data[:sort_ops] ].each do |operations| #replay all query operations operations.each do |operation| operation.last.each do |op| index = subqueries[op[0]] || @model.redis_index(op[0]) self.send operation.first, index, op.last end end end data[:params].each do |name, val| params[name]=val end else #legacy if data.kind_of? String arg = JSON.load(data) elsif data.kind_of? Enumerable arg = data else raise Query::Error, "Reloading query failed. data: #{data.to_s}" #arg = [] #SILENTLY FAIL RELOADING QUERY. THIS IS A *DANGER*OUS DESIGN DECISION MADE FOR THE SAKE OF CONVENIENCE. end arg.each { |n,v| instance_variable_set "@#{n}", v } end end |
#marshaled ⇒ Object
946 947 948 |
# File 'lib/queris/query.rb', line 946 def marshaled Marshal.dump self end |
#member?(id) ⇒ Boolean Also known as: contains?
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 |
# File 'lib/queris/query.rb', line 687 def member?(id) id = id.id if model === id use_page nil run :no_update => !realtime? case t = redis.type(results_key) when 'set' redis.sismember(results_key, id) when 'zset' !redis.zrank(results_key, id).nil? when 'none' false else raise ClientError, "unexpected result set type #{t}" end end |
#no_optimize! ⇒ Object
342 343 344 345 |
# File 'lib/queris/query.rb', line 342 def no_optimize! @no_optimize=true subqueries.each &:no_optimize! end |
#pageable! ⇒ Object
680 681 682 |
# File 'lib/queris/query.rb', line 680 def pageable! @pageable = true end |
#pageable? ⇒ Boolean
677 678 679 |
# File 'lib/queris/query.rb', line 677 def pageable? @pageable end |
#paged? ⇒ Boolean
674 675 676 |
# File 'lib/queris/query.rb', line 674 def paged? @page end |
#param(param_name) ⇒ Object
retrieve query parameters, as fed through union and intersect and diff
79 80 81 |
# File 'lib/queris/query.rb', line 79 def param(param_name) @params[param_name.to_sym] end |
#profiler ⇒ Object
237 238 239 |
# File 'lib/queris/query.rb', line 237 def profiler @profile end |
#query_run_stage_begin(r, q) ⇒ Object
1044 1045 1046 1047 1048 1049 |
# File 'lib/queris/query.rb', line 1044 def query_run_stage_begin(r,q) if uses_index_as_results_key? @trace. "Using index as results key." if @trace_callback end new_run #how procedural... end |
#query_run_stage_inspect(r, q) ⇒ Object
1050 1051 1052 1053 1054 1055 1056 |
# File 'lib/queris/query.rb', line 1050 def query_run_stage_inspect(r,q) gather_ready_data r if live? @itshere[:marshaled]=r.exists results_key(:marshaled) @itshere[:live]=r.exists results_key(:live) end end |
#query_run_stage_prepare(r, q) ⇒ Object
1077 1078 1079 |
# File 'lib/queris/query.rb', line 1077 def query_run_stage_prepare(r,q) return unless @reserved end |
#query_run_stage_release(r, q) ⇒ Object
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 |
# File 'lib/queris/query.rb', line 1093 def query_run_stage_release(r,q) return unless @reserved r.setnx results_key(:exists), 1 if paged? && fluxcap(@already_exists) Queris.run_script :master_expire, r, volatile_query_keys, [ ttl, nil, true ] Queris.run_script :master_expire, r, reusable_temp_keys, [ temp_key_ttl, nil, true ] else Queris.run_script :master_expire, r, volatile_query_keys, [ ttl , true, true] Queris.run_script :master_expire, r, reusable_temp_keys, [ temp_key_ttl, nil, true ] end r.del q.runstate_keys @reserved = nil #make sure volatile keys don't overstay their welcome min = Query::MINIMUM_QUERY_TTL Queris.run_script :undo_add_low_ttl, r, [ runstate_key(:low_ttl) ], [ min ] end |
#query_run_stage_reserve(r, q) ⇒ Object
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 |
# File 'lib/queris/query.rb', line 1057 def query_run_stage_reserve(r,q) return if ready? @reserved = true if live? #marshaled query r.setex results_key(:marshaled), ttl, JSON.dump(json_redis_dump) unless fluxcap @itshere[:marshaled] unless fluxcap @itshere[:live] now=Time.now.utc.to_f all_live_indices.each do |i| r.zadd results_key(:live), now, i.live_delta_key end r.expire results_key(:live), ttl end end @already_exists = r.get results_key(:exists) if paged? #make sure volatile keys don't disappear Queris.run_script :add_low_ttl, r, [ *volatile_query_keys, runstate_key(:low_ttl) ], [ Query::MINIMUM_QUERY_TTL ] end |
#query_run_stage_run(r, q) ⇒ Object
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 |
# File 'lib/queris/query.rb', line 1081 def query_run_stage_run(r,q) return unless @reserved unless ready? run_static_query r else if live? @live_update_msg = Queris.run_script(:update_query, r, [results_key(:marshaled), results_key(:live)], [Time.now.utc.to_f]) end end end |
#range(range) ⇒ Object
500 501 502 503 504 505 506 |
# File 'lib/queris/query.rb', line 500 def range(range) raise ArgumentError, "Range, please." unless Range === range pg=make_page pg.range=range use_page pg self end |
#raw_results(*arg) ⇒ Object
653 654 655 656 |
# File 'lib/queris/query.rb', line 653 def raw_results(*arg) arg.push :raw results(*arg) end |
#ready?(r = nil, subs = true) ⇒ Boolean
1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 |
# File 'lib/queris/query.rb', line 1112 def ready?(r=nil, subs=true) return true if uses_index_as_results_key? || fluxcap(@ready) r ||= redis ready = if paged? @page.ready? else fluxcap @ready end ready end |
#realtime! ⇒ Object
realtime queries are updated automatically, on the spot
251 252 253 254 |
# File 'lib/queris/query.rb', line 251 def realtime! live! @realtime = true end |
#realtime? ⇒ Boolean
255 256 257 |
# File 'lib/queris/query.rb', line 255 def realtime? live? && @realtime end |
#redis ⇒ Object
68 69 70 |
# File 'lib/queris/query.rb', line 68 def redis @redis || Queris.redis(:slave) || model.redis || redis_master end |
#resort ⇒ Object
apply a sort to set of existing results
232 233 234 235 |
# File 'lib/queris/query.rb', line 232 def resort #apply a sort to set of existing results @resort=true self end |
#result(n = 0) ⇒ Object
705 706 707 708 709 710 711 712 |
# File 'lib/queris/query.rb', line 705 def result(n=0) res = results(n...n+1) if res.length > 0 res.first else nil end end |
#result_score(id) ⇒ Object
644 645 646 |
# File 'lib/queris/query.rb', line 644 def result_score(id) result_scores([id]).first end |
#result_scores(*ids) ⇒ Object
631 632 633 634 635 636 637 638 639 640 641 642 |
# File 'lib/queris/query.rb', line 631 def result_scores(*ids) val=[] if ids.count == 1 && Array === ids[0] ids=ids[0] end val=redis.multi do |r| ids.each do |id| redis.zscore(results_key, id) end end val end |
#results(*arg) ⇒ Object
flexible query results retriever results(x..y) from x to y results(x, y) same results(x) first x results results(x..y, :reverse) range in reverse results(x..y, :score =>a..b) results from x to y with scores in given score range results(x..y, :with_scores) return results with result.query_score attr set to the score results(x..y, :replace => [:foo_id, FooModel]) like an SQL join. returns results replaced by FooModels with ids from foo_id
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 |
# File 'lib/queris/query.rb', line 516 def results(*arg) opt= Hash === arg.last ? arg.pop : {} opt[:reverse]=true if arg.member?(:reverse) opt[:with_scores]=true if arg.member?(:with_scores) opt[:range]=arg.shift if Range === arg.first opt[:range]=(arg.shift .. arg.shift) if Numeric === arg[0] && arg[0].class == arg[1].class opt[:range]=(0..arg.shift) if Numeric === arg[0] opt[:raw] = true if arg.member? :raw if opt[:range] && !sort_ops.empty? && pageable? range opt[:range] run :no_update => !realtime? else use_page nil run :no_update => !realtime? end @timer.start("results") if @timer key = results_key case (keytype=redis.type(key)) when 'set' raise Error, "Can't range by score on a regular results set" if opt[:score] raise NotImplemented, "Cannot get result range from shortcut index result set (not sorted); must retrieve all results. This is a temporary queris limitation." if opt[:range] cmd, first, last, rangeopt = :smembers, nil, nil, {} when 'zset' rangeopt = {} rangeopt[:with_scores] = true if opt[:with_scores] if (r = opt[:range]) first, last = r.begin, r.end - (r.exclude_end? ? 1 : 0) raise ArgumentError, "Query result range must have numbers, instead there's a #{first.class} and #{last.class}" unless Numeric === first && Numeric === last raise ArgumentError, "Query results range must have integer endpoints" unless first.round == first && last.round == last end if (scrange = opt[:score]) rangeopt[:limit] = [ first, last - first ] if opt[:range] raise NotImplemented, "Can't fetch results with_scores when also limiting them by score. Pick one or the other." if opt[:with_scores] raise ArgumentError, "Query.results :score parameter must be a Range" unless Range === scrange first = Queris::to_redis_float(scrange.begin * sort_mult) last = Queris::to_redis_float(scrange.end * sort_mult) last = "(#{last}" if scrange.exclude_end? #) first, last = last, first if sort_mult == -1 cmd = opt[:reverse] ? :zrevrangebyscore : :zrangebyscore else cmd = opt[:reverse] ? :zrevrange : :zrange end else return [] end @timer.start :results if block_given? && opt[:replace_command] res = yield cmd, key, first || 0, last || -1, rangeopt elsif @from_hash && !opt[:raw] if rangeopt[:limit] limit, offset = *rangeopt[:limit] else limit, offset = nil, nil end if opt[:replace] replace=opt[:replace] replace_id_attr=replace.first replace_model=replace.last raise Queris::Error, "replace model must be a Queris model, is instead #{replace_model}" unless replace_model < Queris::Model replace_keyf=replace_model.keyf end raw_res, ids, failed_i = redis.evalsha(Queris::script_hash(:results_from_hash), [key], [cmd, first || 0, last || -1, @from_hash, limit, offset, rangeopt[:with_scores] && :true, replace_keyf, replace_id_attr]) res, to_be_deleted = [], [] result_model= replace.nil? ? model : replace_model raw_res.each_with_index do |raw_hash, i| my_id = ids[i] if failed_i.first == i failed_i.shift obj = result_model.find_cached my_id, :assume_missing => true else if (raw_hash.count % 2) > 0 binding.pry 1+1.12 end unless (obj = result_model.restore(raw_hash, my_id)) #we could stil have received an invalid cache object (too few attributes, for example) obj = result_model.find_cached my_id, :assume_missing => true end end if not obj.nil? res << obj elsif @delete_missing to_be_deleted << my_id end end redis.evalsha Queris.script_hash(:remove_from_sets), all_index_keys, to_be_deleted unless to_be_deleted.empty? else if cmd == :smembers res = redis.send(cmd, key) else res = redis.send(cmd, key, first || 0, last || -1, rangeopt) end end if block_given? && !opt[:replace_command] if opt[:with_scores] ret = [] res.each do |result| obj = yield result.first ret << [obj, result.last] unless obj.nil? end res = ret else res.map!(&Proc.new).compact! end end if @timer @timer.finish :results #puts "Timing for #{self}: \r\n #{@timer}" end res end |
#results_key(suffix = nil, raw_id = nil) ⇒ Object
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 |
# File 'lib/queris/query.rb', line 718 def results_key(suffix = nil, raw_id = nil) if @results_key.nil? or raw_id if (reused_set_key = uses_index_as_results_key?) @results_key = reused_set_key else theid = raw_id || (Queris.digest(explain :subqueries => false) << ":subqueries:#{(@subqueries.length > 0 ? @subqueries.map{|q| q.id}.sort.join('&') : 'none')}" << ":sortby:#{sorting_by || 'nothing'}") thekey = "#{@redis_prefix}results:#{theid}" thekey << ":paged:#{@page.source_id}" if paged? if raw_id return thekey else @results_key = thekey end end end if suffix "#{@results_key}:#{suffix}" else @results_key end end |
#run(opt = {}) ⇒ Object
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 |
# File 'lib/queris/query.rb', line 1129 def run(opt={}) raise ClientError, "No redis connection found for query #{self} for model #{self.model.name}." if redis.nil? @timer = Query::Timer.new #parse run options force = opt[:force] force = nil if Numeric === force && force <= 0 if trace? @trace= Trace.new self, @must_trace end run_callbacks :before_run if uses_index_as_results_key? #do nothing, we're using a results key directly from an index which is guaranteed to already exist @trace. "Using index as results key." if @trace return count(:no_run => true) end Queris::RedisStats. = true #run this sucker #the following logic must apply to ALL queries (and subqueries), #since all queries run pipeline stages in 'parallel' (on the same redis pipeline) @timer.start :time run_pipeline redis, :begin do |r, q| #run live index callbacks all_live_indices.each do |i| if i.respond_to? :before_query i.before_query r, self end end run_stage :inspect, r @page.inspect_query(r, self) if paged? end if !ready? || force run_pipeline redis_master, :reserve if paged? begin @page.seek run_pipeline redis, :prepare, :run, :after_run do |r, q| @page.inspect_query(r, self) end end until @page.ready? else run_pipeline redis, :prepare, :run, :after_run end run_pipeline redis_master, :release else if live? && !opt[:no_update] @timer.start :live_update live_update_msg = Queris.run_script(:update_query, redis, [results_key(:marshaled), results_key(:live)], [Time.now.utc.to_f]) @trace. "Live query update: #{live_update_msg}" if @trace @timer.finish :live_update end @trace. "Query results already exist, no trace available." if @trace end @timer.finish :time Queris::RedisStats. = false end |
#run_callbacks(event, with_subqueries = true) ⇒ Object
1038 1039 1040 1041 1042 |
# File 'lib/queris/query.rb', line 1038 def run_callbacks(event, with_subqueries=true) each_subquery { |s| s.model.run_query_callbacks(event, s) } if with_subqueries model.run_query_callbacks event, self self end |
#run_pipeline(redis, *stages) ⇒ Object
1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 |
# File 'lib/queris/query.rb', line 1024 def run_pipeline(redis, *stages) #{stages.join ', '} pipesig = "pipeline #{stages.join ','}" @timer.start pipesig redis.pipelined do |r| #r = redis stages.each do |stage| run_stage(stage, r) end yield(r, self) if block_given? end @timer.finish pipesig end |
#run_stage(stage, r, recurse = true) {|r| ... } ⇒ Object
990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 |
# File 'lib/queris/query.rb', line 990 def run_stage(stage, r, recurse=true) #puts "query run stage #{stage} START for #{self}" method_name = "query_run_stage_#{stage}".to_sym yield(r) if block_given? #page first @page.send method_name, r, self if paged? && @page.respond_to?(method_name) #then subqueries each_subquery do |sub| #assumes this iterator respects subquery dependency ordering (deepest subqueries first) sub.run_stage stage, r, false end #now operations, indices etc. [ops, sort_ops].each do |arr| arr.each do |operation| if operation.respond_to? method_name operation.send method_name, r, self end operation.operands.each do |op| if op.respond_to? method_name op.send method_name, r, self end if !op.is_query? && op.index.respond_to?(method_name) op.index.send method_name, r, self end end end end #and finally, the meat self.send method_name, r, self if respond_to? method_name #puts "query run stage #{stage} END for #{self}" end |
#runstate_key(sub = nil) ⇒ Object
982 983 984 985 986 987 988 |
# File 'lib/queris/query.rb', line 982 def runstate_key(sub=nil) @runstate_keys ||= {} k="#{redis_prefix}run:#{run_id}" k << ":#{sub}"if sub @runstate_keys[k]=true k end |
#runstate_keys ⇒ Object
978 979 980 981 |
# File 'lib/queris/query.rb', line 978 def runstate_keys @runstate_keys ||= {} @runstate_keys.keys end |
#sort(index, reverse = nil) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/queris/query.rb', line 141 def sort(index, reverse = nil) # accept a minus sign in front of index name to mean reverse @results_key = nil if Query === index raise ArgumentError, "Sort can't be extracted from queries over different models." unless index.model == model sort_query = index if sort_query.sort_ops.empty? index = nil else #copy sort from another query sort_query.sort_ops.each do |op| op.operands.each do |operand| use_index operand.index unless Query === index end end self.sort_ops = sort_query.sort_ops.dup sort_query.sort false #unsort sorted query - legacy behavior, probably a bad idea in the long run end else if index.respond_to?('[]') if index[0] == '-' reverse, index = true, index[1..-1] elsif index[0] == '+' reverse, index = false, index[1..-1] end end if index index = use_index index #accept string index names and indices and queries real_index = ForeignIndex === index ? index.real_index : index raise ArgumentError, "Must have a RangeIndex for sorting, found #{real_index.class.name}" unless RangeIndex === real_index self.sort_ops.clear << SortOp.new.push(index, reverse) else self.sort_ops.clear end end use_page make_page self end |
#sort_mult ⇒ Object
sort multiplier (direction) – currently, +1 or -1
227 228 229 230 |
# File 'lib/queris/query.rb', line 227 def sort_mult #sort multiplier (direction) -- currently, +1 or -1 return 1 if sort_ops.empty? sort_ops.first.operands.first.value end |
#sort_score(obj, arg = {}) ⇒ Object
get an object’s sorting score, or its previous sorting score if asked for
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/queris/query.rb', line 205 def sort_score(obj, arg={}) score = 0 sort_ops.each do |op| op.operands.each do |o| if arg[:previous] val = obj.send "#{o.index.attribute}_was" else val = obj.send o.index.attribute end score += o.value * (val || 0).to_f end end score end |
#sortby(index, direction = 1) ⇒ Object
SortOp::SYMBOL
178 179 180 |
# File 'lib/queris/query.rb', line 178 def sortby(index, direction = 1) #SortOp::SYMBOL sort index, direction == -1 end |
#sorting_by ⇒ Object
221 222 223 224 225 226 |
# File 'lib/queris/query.rb', line 221 def sorting_by sorting = sort_ops.map do |op| op.operands.map{|o| "#{(o.value < 0) ? '-' : ''}#{o.index.name}" }.join('+') end.join('+') sorting.empty? ? nil : sorting.to_sym end |
#sorting_by?(index) ⇒ Boolean
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/queris/query.rb', line 182 def sorting_by? index val = 1 if index.respond_to?("[]") && !(Index === index) then if index[0]=='-' then index, val = index[1..-1], -1 end end begin index=@model.redis_index(index) rescue index = nil end if index sort_ops.each do |op| op.operands.each do |o| return true if o.index == index && o.value == val end end end nil end |
#static! ⇒ Object
245 246 247 |
# File 'lib/queris/query.rb', line 245 def static! @live=false; @realtime=false; self; end |
#static? ⇒ Boolean
static queries are updated only after they expire
244 |
# File 'lib/queris/query.rb', line 244 def static?; !live!; end |
#structure ⇒ Object
840 841 842 |
# File 'lib/queris/query.rb', line 840 def structure explain :structure => true end |
#subquery(arg = {}) ⇒ Object
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 |
# File 'lib/queris/query.rb', line 779 def subquery arg={} if arg.kind_of? Query #adopt a given query as subquery raise Error, "Trying to use a subquery from a different model" unless arg.model == model else #create new subquery arg[:model]=model end @used_subquery ||= {} @results_key = nil if arg.kind_of? Query subq = arg else subq = self.class.new((arg[:model] or model), arg.merge(:complete_prefix => redis_prefix, :ttl => @ttl)) end subq.use_redis redis unless @used_subquery[subq] @used_subquery[subq]=true @subqueries << subq end subq end |
#subquery_id(subquery) ⇒ Object
799 800 801 |
# File 'lib/queris/query.rb', line 799 def subquery_id(subquery) @subqueries.index subquery end |
#temp_keys ⇒ Object
321 322 323 324 |
# File 'lib/queris/query.rb', line 321 def temp_keys @temp_keys ||= {} @temp_keys.keys end |
#to_s ⇒ Object
836 837 838 |
# File 'lib/queris/query.rb', line 836 def to_s explain end |
#trace(opt = {}) ⇒ Object
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 |
# File 'lib/queris/query.rb', line 414 def trace(opt={}) indent = opt[:indent] || 0 buf = "#{" " * indent}#{indent == 0 ? 'Query' : 'Subquery'} #{self}:\r\n" buf << "#{" " * indent}key: #{key}\r\n" buf << "#{" " * indent}ttl:#{ttl}, |#{redis.type(key)} results key|=#{count(:no_run => true)}\r\n" buf << "#{" " * indent}trace:\r\n" case @trace when nil buf << "#{" " * indent}No trace available, query hasn't been run yet." when false buf << "#{" " * indent}No trace available, query was run without :trace parameter. Try query.run(:trace => true)" else buf << @trace.indent(indent).to_s end opt[:output]!=false ? puts(buf) : buf end |
#trace!(opt = {}) ⇒ Object
403 404 405 406 407 408 409 410 |
# File 'lib/queris/query.rb', line 403 def trace!(opt={}) if opt==false @must_trace = false elsif opt @must_trace = opt end self end |
#trace? ⇒ Boolean
411 412 413 |
# File 'lib/queris/query.rb', line 411 def trace? @must_trace || @trace end |
#undo(num_operations = 1) ⇒ Object
undo the last n operations
128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/queris/query.rb', line 128 def undo (num_operations=1) return self if num_operations == 0 || ops.empty? @results_key = nil op = ops.last raise ClientError, "Unexpected operand-less query operation" unless (last_operand = op.operands.last) if Query === (sub = last_operand.index) subqueries.delete sub end op.operands.pop ops.pop if op.operands.empty? undo(num_operations - 1) end |
#union(index, val = nil) ⇒ Object Also known as: ∪
the set operations
84 85 86 |
# File 'lib/queris/query.rb', line 84 def union(index, val=nil) prepare_op UnionOp, index, val end |
#unpageable! ⇒ Object
683 684 685 |
# File 'lib/queris/query.rb', line 683 def unpageable! @pageable = false end |
#update(obj, arg = {}) ⇒ Object
update query results with object(s)
273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/queris/query.rb', line 273 def update(obj, arg={}) if uses_index_as_results_key? # DISCUSS : query.union(subquery) won't be updated return true end obj_id = model === obj ? obj.id : obj #BUG-IN-WAITING: HARDCODED id attribute myredis = arg[:redis] || redis_master || redis if arg[:delete] ret = myredis.zrem results_key, obj_id else ret = myredis.zadd results_key(:delta), 0, obj_id end ret end |
#usable_as_results?(*arg) ⇒ Boolean
302 303 304 |
# File 'lib/queris/query.rb', line 302 def usable_as_results?(*arg) true end |
#use_page(page) ⇒ Object
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 |
# File 'lib/queris/query.rb', line 658 def use_page(page) @results_key=nil if !pageable? return nil end raise ArgumentError, "must be a Query::Page" unless page.nil? || Page === page if block_given? temp=@page use_page page yield use_page temp else @page=page subqueries.each {|s| s.use_page @page} end end |
#use_redis(redis_instance) ⇒ Object
seems obsolete with the new run pipeline
72 73 74 75 76 |
# File 'lib/queris/query.rb', line 72 def use_redis(redis_instance) #seems obsolete with the new run pipeline @redis = redis_instance subqueries.each {|sub| sub.use_redis redis_instance} self end |
#uses_index?(*arg) ⇒ Boolean
431 432 433 434 435 436 437 |
# File 'lib/queris/query.rb', line 431 def uses_index?(*arg) arg.each do |ind| index_name = Queris::Index === ind ? ind.name : ind.to_sym return true if @used_index[index_name] end false end |
#uses_index_as_results_key? ⇒ Boolean
291 292 293 294 295 296 297 298 299 300 |
# File 'lib/queris/query.rb', line 291 def uses_index_as_results_key? if ops.length == 1 && sort_ops.empty? && ops.first.operands.length == 1 first_op = ops.first.operands.first first_index = first_op.index if first_index.usable_as_results? first_op.value return first_index.key first_op.value end end nil end |
#volatile_query_keys ⇒ Object
a list of all keys that need to be refreshed (ttl extended) for a query
307 308 309 310 311 312 313 314 |
# File 'lib/queris/query.rb', line 307 def volatile_query_keys #first element MUST be the existence flag key #second element MUST be the results key volatile = [ results_key(:exists), results_key ] volatile |=[ results_key(:live), results_key(:marshaled) ] if live? volatile |= @page.volatile_query_keys(self) if paged? volatile end |