Class: Queris::Query

Inherits:
Object
  • Object
show all
Defined in:
lib/queris/query.rb,
lib/queris/query/page.rb,
lib/queris/query/timer.rb,
lib/queris/query/trace.rb,
lib/queris/query/operations.rb

Defined Under Namespace

Classes: DiffOp, Error, IntersectOp, Op, Page, SortOp, Timer, Trace, UnionOp

Constant Summary collapse

MINIMUM_QUERY_TTL =

seconds. Don’t mess with this number unless you fully understand it, setting it too small may lead to subquery race conditions

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(model, arg = nil, &block) ⇒ Query

Returns a new instance of Query.

Raises:

  • (ArgumentError)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/queris/query.rb', line 18

def initialize(model, arg=nil, &block)
  if model.kind_of?(Hash) and arg.nil?
    arg, model = model, model[:model]
  elsif arg.nil?
    arg= {}
  end
  raise ArgumentError, "Can't create query without a model" unless model
  raise Error, "include Queris in your model (#{model.inspect})." unless model.include? Queris
  @model = model
  @params = {}
  @ops = []
  @sort_ops = []
  @used_index = {}
  @redis_prefix = arg[:complete_prefix] || ((arg[:prefix] || arg[:redis_prefix] || model.prefix) + self.class.name.split('::').last + ":")
  @redis=arg[:redis]
  @profile = arg[:profiler] || model.query_profiler.new(nil, :redis => @redis || model.redis)
  @subqueries = []
  self.ttl=arg[:ttl] || 600 #10 minutes default time-to-live
  @temp_key_ttl = arg[:temp_key_ttl] || 300
  @trace=nil
  @pageable = arg[:pageable] || arg[:paged]
  live! if arg[:live]
  realtime! if arg[:realtime]
  @created_at = Time.now.utc
  if @expire_after = (arg[:expire_at] || arg[:expire] || arg[:expire_after])
    raise ArgumentError, "Can't create query with expire_at option and check_staleness options at once" if arg[:check_staleness]
    raise ArgumentError, "Can't create query with expire_at option with track_stats disabled" if arg[:track_stats]==false
    arg[:track_stats]=true
    arg[:check_staleness] = Proc.new do |query|
      query.time_cached < (@expire_after || Time.at(0))
    end
  end

  @from_hash = arg[:from_hash]
  @restore_failed_callback = arg[:restore_failed]
  @delete_missing = arg[:delete_missing]

  @track_stats = arg[:track_stats]
  @check_staleness = arg[:check_staleness]
  if block_given?
    instance_eval(&block)
  end
  self
end

Instance Attribute Details

#created_atObject

Returns the value of attribute created_at.



16
17
18
# File 'lib/queris/query.rb', line 16

def created_at
  @created_at
end

#modelObject

Returns the value of attribute model.



16
17
18
# File 'lib/queris/query.rb', line 16

def model
  @model
end

#opsObject

Returns the value of attribute ops.



16
17
18
# File 'lib/queris/query.rb', line 16

def ops
  @ops
end

#paramsObject

Returns the value of attribute params.



16
17
18
# File 'lib/queris/query.rb', line 16

def params
  @params
end

#redis_prefixObject

Returns the value of attribute redis_prefix.



16
17
18
# File 'lib/queris/query.rb', line 16

def redis_prefix
  @redis_prefix
end

#run_idObject

Returns the value of attribute run_id.



977
978
979
# File 'lib/queris/query.rb', line 977

def run_id
  @run_id
end

#sort_opsObject

Returns the value of attribute sort_ops.



16
17
18
# File 'lib/queris/query.rb', line 16

def sort_ops
  @sort_ops
end

#subqueriesObject (readonly)

Returns the value of attribute subqueries.



17
18
19
# File 'lib/queris/query.rb', line 17

def subqueries
  @subqueries
end

#temp_key_ttlObject (readonly)

Returns the value of attribute temp_key_ttl.



17
18
19
# File 'lib/queris/query.rb', line 17

def temp_key_ttl
  @temp_key_ttl
end

#timerObject

Returns the value of attribute timer.



967
968
969
# File 'lib/queris/query.rb', line 967

def timer
  @timer
end

#ttlObject

Returns the value of attribute ttl.



17
18
19
# File 'lib/queris/query.rb', line 17

def ttl
  @ttl
end

Instance Method Details

#add_temp_key(k) ⇒ Object



316
317
318
319
# File 'lib/queris/query.rb', line 316

def add_temp_key(k)
  @temp_keys ||= {}
  @temp_keys[k]=true
end

#all_index_keysObject



958
959
960
961
962
963
964
965
# File 'lib/queris/query.rb', line 958

def all_index_keys
  keys = []
  [ops, sort_ops].each do |ops|
    ops.each { |op| keys.concat op.keys(nil, true) }
  end
  keys << key
  keys.uniq
end

#all_indicesObject

list all indices (including subqueries)



454
455
456
# File 'lib/queris/query.rb', line 454

def all_indices
  indices :subqueries => true
end

#all_live_indicesObject



457
458
459
460
461
462
463
464
# File 'lib/queris/query.rb', line 457

def all_live_indices
  return [] unless live?
  ret = all_indices
  ret.select! do |i|
    (ForeignIndex === i ? i.real_index : i).live?
  end
  ret
end

#all_query_keysObject

all keys related to a query



326
327
328
329
330
# File 'lib/queris/query.rb', line 326

def all_query_keys
  all = volatile_query_keys
  all |= temp_keys
  all
end

#all_subqueriesObject



388
389
390
391
392
# File 'lib/queris/query.rb', line 388

def all_subqueries
  ret = subqueries.dup
  subqueries.each { |sub| ret.concat sub.all_subqueries }
  ret
end

#count(opt = {}) ⇒ Object Also known as: size, length



748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
# File 'lib/queris/query.rb', line 748

def count(opt={})
  use_page nil
  run(:no_update => !realtime?) unless opt [:no_run]
  key = results_key
  case redis.type(key)
  when 'set'
    raise Error, "Query results are not a sorted set (maybe using a set index directly), can't range" if opt[:score]
    redis.scard key
  when 'zset'
    if opt[:score]
      range = opt[:score]
      raise ArgumentError, ":score option must be a Range, but it's a #{range.class} instead" unless Range === range
      first = range.begin * sort_mult
      last = range.exclude_end? ? "(#{range.end.to_f * sort_mult}" : range.end.to_f * sort_mult #)
      first, last = last, first if sort_mult == -1
      redis.zcount(key, first, last)
    else
      redis.zcard key
    end
  else #not a set. 
    0
  end
end

#delta(arg = {}) ⇒ Object



287
288
289
290
# File 'lib/queris/query.rb', line 287

def delta(arg={})
  myredis = arg[:redis] || redis_master || redis
  myredis.zrange results_key(:delta), 0, -1
end

#diff(index, val = nil) ⇒ Object Also known as:



94
95
96
# File 'lib/queris/query.rb', line 94

def diff(index, val=nil)
  prepare_op DiffOp, index, val
end

#each_operand(which_ops = nil) ⇒ Object

walk though all query operands



950
951
952
953
954
955
956
# File 'lib/queris/query.rb', line 950

def each_operand(which_ops=nil) #walk though all query operands
  (which_ops == :sort ? sort_ops : ops).each do |operation|
    operation.operands.each do |operand|
      yield operand, operation
    end
  end
end

#each_subquery(recursive = true) ⇒ Object

dependency-ordered



394
395
396
397
398
399
400
401
# File 'lib/queris/query.rb', line 394

def each_subquery(recursive=true) #dependency-ordered
  subqueries.each do |s|
    s.each_subquery do |ss|
      yield ss
    end
    yield s
  end
end

#explain(opt = {}) ⇒ Object



803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
# File 'lib/queris/query.rb', line 803

def explain(opt={})
  return "(∅)" if ops.nil? || ops.empty?
  first_op = ops.first
  r = ops.map do |op|
    operands = op.operands.map do |o|
      if Query === o.index
        if opt[:subqueries] != false 
          o.index.explain opt
        else
          "{subquery #{subquery_id o.index}}"
        end
      else
        value = case opt[:serialize]
        when :json
          JSON.dump o.value
        when :ruby
          Marshal.dump o.value
        else #human-readable and sufficiently unique
          o.value.to_s
        end
        "#{o.index.name}#{(value.empty? || opt[:structure])? nil : "<#{value}>"}"
      end
    end
    op_str = operands.join " #{op.symbol} "
    if first_op == op
      op_str.insert 0, "∅ #{op.symbol} " if DiffOp === op
    else
      op_str.insert 0, " #{op.symbol} "
    end
    op_str
  end
  "(#{r.join})"
end

#first_resultObject



714
715
716
# File 'lib/queris/query.rb', line 714

def first_result
  return result 0
end

#flush(arg = {}) ⇒ Object Also known as: clear

recursively and conditionally flush query and subqueries arg parameters: flush query if:

:ttl - query.ttl <= ttl
:index (symbol or index or an array of them) - query uses th(is|ese) ind(ex|ices)

or flush conditionally according to passed block: flush {|query| true } when no parameters or block present, flush only this query and no subqueries



472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
# File 'lib/queris/query.rb', line 472

def flush(arg={})
  model.run_query_callbacks :before_flush, self
  return 0 if uses_index_as_results_key?
  flushed = 0
  if block_given? #efficiency hackety hack - anonymous blocs are heaps faster than bound ones
    subqueries.each { |sub| flushed += sub.flush arg, &Proc.new }
  elsif arg.count>0
    subqueries.each { |sub| flushed += sub.flush arg }
  end
  if flushed > 0 || arg.count==0 || ttl <= (arg[:ttl] || 0) || (uses_index?(*arg[:index])) || block_given? && (yield self)
    #this only works because of the slave EXPIRE hack requiring dummy query results_keys on master.
    #otherwise, we'd have to create the key first (in a MULTI, of course)
    n_deleted = 0
    (redis_master || redis).multi do |r|
      all = all_query_keys
      all.each do |k|
        r.setnx k, 'baleted'
      end
      n_deleted = r.del all 
    end
    @known_to_exist = nil
    flushed += n_deleted.value
  end
  flushed
end

#gather_ready_data(r) ⇒ Object



1122
1123
1124
1125
1126
# File 'lib/queris/query.rb', line 1122

def gather_ready_data(r)
  unless paged?
    @ready = Queris.run_script :unpaged_query_ready, r, [ results_key, results_key(:exists), runstate_key(:ready) ]
  end
end

#idObject



744
745
746
# File 'lib/queris/query.rb', line 744

def id
  Queris.digest results_key
end

#indices(opt = {}) ⇒ Object

list all indices used by a query (no subqueries, unless asked for)



440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/queris/query.rb', line 440

def indices(opt={})
  ret = @used_index.dup
  if opt[:subqueries]
    subqueries.each do |sub|
      ret.merge! sub.indices(subqueries: true, hash: true)
    end
  end
  if opt[:hash]
    ret
  else
    ret.values
  end
end

#info(opt = {}) ⇒ Object



844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
# File 'lib/queris/query.rb', line 844

def info(opt={})
  ind = opt[:indent] || ""
  info = "#{ind}#{self} info:\r\n"
  info <<  "#{ind}key: #{results_key}\r\n" unless opt[:no_key]
  info << "#{ind}redis key type:#{redis.type key}, size: #{count :no_run => true}\r\n" unless opt[:no_size]
  info << "#{ind}liveliness:#{live? ? (realtime? ? 'realtime' : 'live') : 'static'}"
  if live?
    #live_keyscores = redis.zrange(results_key(:live), 0, -1, :with_scores => true)
    #info << live_keyscores.map{|i,v| "#{i}:#{v}"}.join(", ")
    info << " (live indices: #{redis.zcard results_key(:live)})"
  end
  info << "\r\n"
  info << "#{ind}id: #{id}, ttl: #{ttl}, sort: #{sorting_by || "none"}\r\n" unless opt[:no_details]
  info << "#{ind}remaining ttl: results: #{redis.pttl(results_key)}ms, existence flag: #{redis.pttl results_key(:exists)}ms, marshaled: #{redis.pttl results_key(:marshaled)}ms\r\n" if opt[:debug_ttls]
  unless @subqueries.empty? || opt[:no_subqueries]
    info << "#{ind}subqueries:\r\n"
    @subqueries.each do |sub|
      info << sub.info(opt.merge(:indent => ind + "  ", :output=>false))
    end
  end
  opt[:output]!=false ? puts(info) : info
end

#intersect(index, val = nil) ⇒ Object Also known as:



89
90
91
# File 'lib/queris/query.rb', line 89

def intersect(index, val=nil)
  prepare_op IntersectOp, index, val
end

#json_redis_dumpObject



898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
# File 'lib/queris/query.rb', line 898

def json_redis_dump
  queryops = []
  ops.each {|op| queryops.concat(op.json_redis_dump)}
  queryops.reverse!
  sortops = []
  sort_ops.each{|op| sortops.concat(op.json_redis_dump)}
  ret = {
    key: results_key,
    live: live?,
    realtime: realtime?,
    ops_reverse: queryops,
    sort_ops: sortops
  }
  ret
end

#key(arg = nil) ⇒ Object Also known as: key_for_query



739
740
741
# File 'lib/queris/query.rb', line 739

def key arg=nil
  results_key
end

#key_size(redis_key = nil, r = nil) ⇒ Object

current query size



775
776
777
# File 'lib/queris/query.rb', line 775

def key_size(redis_key=nil, r=nil)
  Queris.run_script :multisize, (r || redis), [redis_key || key]
end

#live!Object

live queries have pending updates stored nearby



249
# File 'lib/queris/query.rb', line 249

def live!; @live=true; @realtime=false; validate_ttl; self; end

#live=(val) ⇒ Object



242
# File 'lib/queris/query.rb', line 242

def live=(val);  @live= val; end

#live?Boolean

Returns:

  • (Boolean)


241
# File 'lib/queris/query.rb', line 241

def live?; @live; end

#marshal_dumpObject



867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
# File 'lib/queris/query.rb', line 867

def marshal_dump
  subs = {}
  @subqueries.each { |sub| subs[sub.id.to_sym]=sub.marshal_dump }
  unique_params = params.dup
  each_operand do |op|
    unless Query === op.index
      param_name = op.index.name
      unique_params.delete param_name if params[param_name] == op.value
    end
  end
  {
    model: model.name.to_sym,
    ops: ops.map{|op| op.marshal_dump},
    sort_ops: sort_ops.map{|op| op.marshal_dump},
    subqueries: subs,
    params: unique_params,
    
    args: {
      complete_prefix: redis_prefix,
      ttl: ttl,
      expire_after: @expire_after,
      track_stats: @track_stats,
      live: @live,
      realtime: @realtime,
      from_hash: @from_hash,
      delete_missing: @delete_missing,
      pageable: pageable?
    }
  }
end

#marshal_load(data) ⇒ Object



914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
# File 'lib/queris/query.rb', line 914

def marshal_load(data)
  if Hash === data
    initialize Queris.model(data[:model]), data[:args]
    subqueries = {}
    data[:subqueries].map do |id, sub|
      q = Query.allocate
      q.marshal_load sub
      subqueries[id]=q
    end
    [ data[:ops], data[:sort_ops] ].each do |operations| #replay all query operations
      operations.each do |operation|
        operation.last.each do |op|
          index = subqueries[op[0]] || @model.redis_index(op[0])
          self.send operation.first, index, op.last
        end
      end
    end
    data[:params].each do |name, val|
      params[name]=val
    end
  else #legacy
    if data.kind_of? String
      arg = JSON.load(data)
    elsif data.kind_of? Enumerable
      arg = data
    else
      raise Query::Error, "Reloading query failed. data: #{data.to_s}"
      #arg = [] #SILENTLY FAIL RELOADING QUERY. THIS IS A *DANGER*OUS DESIGN DECISION MADE FOR THE SAKE OF CONVENIENCE.
    end
    arg.each { |n,v| instance_variable_set "@#{n}", v }
  end
end

#marshaledObject



946
947
948
# File 'lib/queris/query.rb', line 946

def marshaled
  Marshal.dump self
end

#member?(id) ⇒ Boolean Also known as: contains?

Returns:

  • (Boolean)


687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
# File 'lib/queris/query.rb', line 687

def member?(id)
  id = id.id if model === id
  use_page nil
  run :no_update => !realtime?
  case t = redis.type(results_key)
  when 'set'
    redis.sismember(results_key, id)
  when 'zset'
    !redis.zrank(results_key, id).nil?
  when 'none'
    false
  else
    raise ClientError, "unexpected result set type #{t}"
  end
end

#no_optimize!Object



342
343
344
345
# File 'lib/queris/query.rb', line 342

def no_optimize!
  @no_optimize=true
  subqueries.each &:no_optimize!
end

#pageable!Object



680
681
682
# File 'lib/queris/query.rb', line 680

def pageable!
  @pageable = true
end

#pageable?Boolean

Returns:

  • (Boolean)


677
678
679
# File 'lib/queris/query.rb', line 677

def pageable?
  @pageable
end

#paged?Boolean

Returns:

  • (Boolean)


674
675
676
# File 'lib/queris/query.rb', line 674

def paged?
  @page
end

#param(param_name) ⇒ Object

retrieve query parameters, as fed through union and intersect and diff



79
80
81
# File 'lib/queris/query.rb', line 79

def param(param_name)
  @params[param_name.to_sym]
end

#profilerObject



237
238
239
# File 'lib/queris/query.rb', line 237

def profiler
  @profile
end

#query_run_stage_begin(r, q) ⇒ Object



1044
1045
1046
1047
1048
1049
# File 'lib/queris/query.rb', line 1044

def query_run_stage_begin(r,q)
  if uses_index_as_results_key?
    @trace.message "Using index as results key." if @trace_callback
  end
  new_run #how procedural...
end

#query_run_stage_inspect(r, q) ⇒ Object



1050
1051
1052
1053
1054
1055
1056
# File 'lib/queris/query.rb', line 1050

def query_run_stage_inspect(r,q)
  gather_ready_data r
  if live?
    @itshere[:marshaled]=r.exists results_key(:marshaled)
    @itshere[:live]=r.exists results_key(:live)
  end
end

#query_run_stage_prepare(r, q) ⇒ Object



1077
1078
1079
# File 'lib/queris/query.rb', line 1077

def query_run_stage_prepare(r,q)
  return unless @reserved
end

#query_run_stage_release(r, q) ⇒ Object



1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
# File 'lib/queris/query.rb', line 1093

def query_run_stage_release(r,q)
  return unless @reserved
  r.setnx results_key(:exists), 1
  if paged? && fluxcap(@already_exists)
    Queris.run_script :master_expire, r, volatile_query_keys, [ ttl, nil, true ]
    Queris.run_script :master_expire, r, reusable_temp_keys, [ temp_key_ttl, nil, true ]
  else
    Queris.run_script :master_expire, r, volatile_query_keys, [ ttl , true, true]
    Queris.run_script :master_expire, r, reusable_temp_keys, [ temp_key_ttl, nil, true ]
  end
  
  r.del q.runstate_keys
  @reserved = nil
  
  #make sure volatile keys don't overstay their welcome
  min = Query::MINIMUM_QUERY_TTL
  Queris.run_script :undo_add_low_ttl, r, [ runstate_key(:low_ttl) ], [ min ]
end

#query_run_stage_reserve(r, q) ⇒ Object



1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
# File 'lib/queris/query.rb', line 1057

def query_run_stage_reserve(r,q)
  return if ready?
  @reserved = true
  if live?
    #marshaled query 
    r.setex results_key(:marshaled), ttl, JSON.dump(json_redis_dump) unless fluxcap @itshere[:marshaled]
    unless fluxcap @itshere[:live]
      now=Time.now.utc.to_f
      all_live_indices.each do |i|
        r.zadd results_key(:live), now, i.live_delta_key
      end
      r.expire results_key(:live), ttl
    end
  end
  @already_exists = r.get results_key(:exists) if paged?
  #make sure volatile keys don't disappear
  
  Queris.run_script :add_low_ttl, r, [ *volatile_query_keys, runstate_key(:low_ttl) ], [ Query::MINIMUM_QUERY_TTL ]
end

#query_run_stage_run(r, q) ⇒ Object



1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
# File 'lib/queris/query.rb', line 1081

def query_run_stage_run(r,q)
  return unless @reserved
  unless ready?
    run_static_query r
  else
    
    if live?
      @live_update_msg = Queris.run_script(:update_query, r, [results_key(:marshaled), results_key(:live)], [Time.now.utc.to_f])
    end
  end
end

#range(range) ⇒ Object

Raises:

  • (ArgumentError)


500
501
502
503
504
505
506
# File 'lib/queris/query.rb', line 500

def range(range)
  raise ArgumentError, "Range, please." unless Range === range
  pg=make_page
  pg.range=range
  use_page pg
  self
end

#raw_results(*arg) ⇒ Object



653
654
655
656
# File 'lib/queris/query.rb', line 653

def raw_results(*arg)
  arg.push :raw
  results(*arg)
end

#ready?(r = nil, subs = true) ⇒ Boolean

Returns:

  • (Boolean)


1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
# File 'lib/queris/query.rb', line 1112

def ready?(r=nil, subs=true)
  return true if uses_index_as_results_key? || fluxcap(@ready)
  r ||= redis
  ready = if paged?
    @page.ready?
  else
    fluxcap @ready
  end
  ready
end

#realtime!Object

realtime queries are updated automatically, on the spot



251
252
253
254
# File 'lib/queris/query.rb', line 251

def realtime!
  live!
  @realtime = true
end

#realtime?Boolean

Returns:

  • (Boolean)


255
256
257
# File 'lib/queris/query.rb', line 255

def realtime?
  live? && @realtime
end

#redisObject



68
69
70
# File 'lib/queris/query.rb', line 68

def redis
  @redis || Queris.redis(:slave) || model.redis || redis_master
end

#resortObject

apply a sort to set of existing results



232
233
234
235
# File 'lib/queris/query.rb', line 232

def resort #apply a sort to set of existing results
  @resort=true
  self
end

#result(n = 0) ⇒ Object



705
706
707
708
709
710
711
712
# File 'lib/queris/query.rb', line 705

def result(n=0)
  res = results(n...n+1)
  if res.length > 0 
    res.first
  else
    nil
  end
end

#result_score(id) ⇒ Object



644
645
646
# File 'lib/queris/query.rb', line 644

def result_score(id)
  result_scores([id]).first
end

#result_scores(*ids) ⇒ Object



631
632
633
634
635
636
637
638
639
640
641
642
# File 'lib/queris/query.rb', line 631

def result_scores(*ids)
  val=[]
  if ids.count == 1 && Array === ids[0]
    ids=ids[0]
  end
  val=redis.multi do |r|
    ids.each do |id|
      redis.zscore(results_key, id)
    end
  end
  val
end

#results(*arg) ⇒ Object

flexible query results retriever results(x..y) from x to y results(x, y) same results(x) first x results results(x..y, :reverse) range in reverse results(x..y, :score =>a..b) results from x to y with scores in given score range results(x..y, :with_scores) return results with result.query_score attr set to the score results(x..y, :replace => [:foo_id, FooModel]) like an SQL join. returns results replaced by FooModels with ids from foo_id



516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
# File 'lib/queris/query.rb', line 516

def results(*arg)
  opt= Hash === arg.last ? arg.pop : {}
  opt[:reverse]=true if arg.member?(:reverse)
  opt[:with_scores]=true if arg.member?(:with_scores)
  opt[:range]=arg.shift if Range === arg.first
  opt[:range]=(arg.shift .. arg.shift) if Numeric === arg[0] && arg[0].class == arg[1].class
  opt[:range]=(0..arg.shift) if Numeric === arg[0]
  opt[:raw] = true if arg.member? :raw
  if opt[:range] && !sort_ops.empty? && pageable?
    range opt[:range]
    run :no_update => !realtime?
  else
    use_page nil
    run :no_update => !realtime?
  end
  @timer.start("results") if @timer
  key = results_key
  case (keytype=redis.type(key))
  when 'set'
    raise Error, "Can't range by score on a regular results set" if opt[:score]
    raise NotImplemented, "Cannot get result range from shortcut index result set (not sorted); must retrieve all results. This is a temporary queris limitation." if opt[:range]
    cmd, first, last, rangeopt = :smembers, nil, nil, {}
  when 'zset'
    rangeopt = {}
    rangeopt[:with_scores] = true if opt[:with_scores]
    if (r = opt[:range])
      first, last = r.begin, r.end - (r.exclude_end? ? 1 : 0)
      raise ArgumentError, "Query result range must have numbers, instead there's a #{first.class} and #{last.class}" unless Numeric === first && Numeric === last
      raise ArgumentError, "Query results range must have integer endpoints" unless first.round == first && last.round == last
    end
    if (scrange = opt[:score])
      rangeopt[:limit] = [ first, last - first ] if opt[:range]
      raise NotImplemented, "Can't fetch results with_scores when also limiting them by score. Pick one or the other." if opt[:with_scores]
      raise ArgumentError, "Query.results :score parameter must be a Range" unless Range === scrange
      first = Queris::to_redis_float(scrange.begin * sort_mult)
      last = Queris::to_redis_float(scrange.end * sort_mult)
      last = "(#{last}" if scrange.exclude_end? #)
      first, last = last, first if sort_mult == -1
      cmd = opt[:reverse] ? :zrevrangebyscore : :zrangebyscore
    else
      cmd = opt[:reverse] ? :zrevrange : :zrange
    end
  else
    return []
  end
  @timer.start :results
  if block_given? && opt[:replace_command]
    res = yield cmd, key, first || 0, last || -1, rangeopt
  elsif @from_hash && !opt[:raw]
    
    if rangeopt[:limit]
      limit, offset = *rangeopt[:limit]
    else
      limit, offset = nil, nil
    end
    if opt[:replace]
      replace=opt[:replace]
      replace_id_attr=replace.first
      replace_model=replace.last
      raise Queris::Error, "replace model must be a Queris model, is instead #{replace_model}" unless replace_model < Queris::Model
      replace_keyf=replace_model.keyf
    end
    raw_res, ids, failed_i = redis.evalsha(Queris::script_hash(:results_from_hash), [key], [cmd, first || 0, last || -1, @from_hash, limit, offset, rangeopt[:with_scores] && :true, replace_keyf, replace_id_attr])
    res, to_be_deleted = [], []
    
    result_model= replace.nil? ? model : replace_model
    
    raw_res.each_with_index do |raw_hash, i|
      my_id = ids[i]
      if failed_i.first == i
        failed_i.shift
        obj = result_model.find_cached my_id, :assume_missing => true
      else
        if (raw_hash.count % 2) > 0
          binding.pry
          1+1.12
        end
        unless (obj = result_model.restore(raw_hash, my_id))
          #we could stil have received an invalid cache object (too few attributes, for example)
          obj = result_model.find_cached my_id, :assume_missing => true
        end
      end
      if not obj.nil?
        res << obj
      elsif @delete_missing
        to_be_deleted << my_id
      end
    end
    redis.evalsha Queris.script_hash(:remove_from_sets), all_index_keys, to_be_deleted unless to_be_deleted.empty?
  else
    if cmd == :smembers
      res = redis.send(cmd, key)
    else
      res = redis.send(cmd, key, first || 0, last || -1, rangeopt)
    end
  end
  if block_given? && !opt[:replace_command]
    if opt[:with_scores]
      ret = []
      res.each do |result|
        obj = yield result.first
        ret << [obj, result.last] unless obj.nil?
      end
      res = ret
    else
      res.map!(&Proc.new).compact!
    end
  end
  if @timer
    @timer.finish :results
    #puts "Timing for #{self}: \r\n  #{@timer}"
  end
  res
end

#results_key(suffix = nil, raw_id = nil) ⇒ Object



718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
# File 'lib/queris/query.rb', line 718

def results_key(suffix = nil, raw_id = nil)
  if @results_key.nil? or raw_id
    if (reused_set_key = uses_index_as_results_key?)
      @results_key = reused_set_key
    else
      theid = raw_id || (Queris.digest(explain :subqueries => false) << ":subqueries:#{(@subqueries.length > 0 ? @subqueries.map{|q| q.id}.sort.join('&') : 'none')}" << ":sortby:#{sorting_by || 'nothing'}")
      thekey = "#{@redis_prefix}results:#{theid}"
      thekey << ":paged:#{@page.source_id}" if paged?
      if raw_id
        return thekey
      else
        @results_key = thekey
      end
    end
  end
  if suffix
    "#{@results_key}:#{suffix}"
  else
    @results_key
  end
end

#run(opt = {}) ⇒ Object

Raises:



1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
# File 'lib/queris/query.rb', line 1129

def run(opt={})
  raise ClientError, "No redis connection found for query #{self} for model #{self.model.name}." if redis.nil?
  @timer = Query::Timer.new
  #parse run options
  force = opt[:force]
  force = nil if Numeric === force && force <= 0
  if trace?
    @trace= Trace.new self, @must_trace
  end
  run_callbacks :before_run
  if uses_index_as_results_key?
    #do nothing, we're using a results key directly from an index which is guaranteed to already exist
    @trace.message "Using index as results key." if @trace
    return count(:no_run => true)
  end

  Queris::RedisStats.querying = true
  #run this sucker
  #the following logic must apply to ALL queries (and subqueries),
  #since all queries run pipeline stages in 'parallel' (on the same redis pipeline)
  @timer.start :time
  run_pipeline redis, :begin do |r, q|
    #run live index callbacks
    all_live_indices.each do |i|
      if i.respond_to? :before_query
        i.before_query r, self
      end
    end
    run_stage :inspect, r
    @page.inspect_query(r, self) if paged?
  end
  if !ready? || force
    run_pipeline redis_master, :reserve
    if paged?
      begin
        @page.seek
        run_pipeline redis, :prepare, :run, :after_run do |r, q|
          @page.inspect_query(r, self)
        end
      end until @page.ready?
    else
      run_pipeline redis, :prepare, :run, :after_run
    end
    run_pipeline redis_master, :release
  else
    if live? && !opt[:no_update]
      @timer.start :live_update
      live_update_msg = Queris.run_script(:update_query, redis, [results_key(:marshaled), results_key(:live)], [Time.now.utc.to_f])
      @trace.message "Live query update: #{live_update_msg}" if @trace
      @timer.finish :live_update
    end
    @trace.message "Query results already exist, no trace available." if @trace
  end
  @timer.finish :time
  Queris::RedisStats.querying = false
end

#run_callbacks(event, with_subqueries = true) ⇒ Object



1038
1039
1040
1041
1042
# File 'lib/queris/query.rb', line 1038

def run_callbacks(event, with_subqueries=true)
  each_subquery { |s| s.model.run_query_callbacks(event, s) } if with_subqueries
  model.run_query_callbacks event, self
  self
end

#run_pipeline(redis, *stages) ⇒ Object



1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
# File 'lib/queris/query.rb', line 1024

def run_pipeline(redis, *stages)
  #{stages.join ', '}
  pipesig = "pipeline #{stages.join ','}"
  @timer.start pipesig
  redis.pipelined do |r|
    #r = redis
    stages.each do |stage|
      run_stage(stage, r)
    end
    yield(r, self) if block_given?
  end
  @timer.finish pipesig
end

#run_stage(stage, r, recurse = true) {|r| ... } ⇒ Object

Yields:

  • (r)


990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
# File 'lib/queris/query.rb', line 990

def run_stage(stage, r, recurse=true)
  #puts "query run stage #{stage} START for #{self}"
  method_name = "query_run_stage_#{stage}".to_sym
  yield(r) if block_given?
  
  #page first
  @page.send method_name, r, self if paged? && @page.respond_to?(method_name)
  
  #then subqueries
  each_subquery do |sub| 
    #assumes this iterator respects subquery dependency ordering (deepest subqueries first)
    sub.run_stage stage, r, false
  end
  #now operations, indices etc.
  [ops, sort_ops].each do |arr|
    arr.each do |operation|
      if operation.respond_to? method_name
        operation.send method_name, r, self
      end
      operation.operands.each do |op|
        if op.respond_to? method_name
          op.send method_name, r, self
        end
        if !op.is_query? && op.index.respond_to?(method_name)
          op.index.send method_name, r, self
        end
      end
    end
  end
  #and finally, the meat
  self.send method_name, r, self if respond_to? method_name
  #puts "query run stage #{stage} END for #{self}"
end

#runstate_key(sub = nil) ⇒ Object



982
983
984
985
986
987
988
# File 'lib/queris/query.rb', line 982

def runstate_key(sub=nil)
  @runstate_keys ||= {}
  k="#{redis_prefix}run:#{run_id}"
  k << ":#{sub}"if sub
  @runstate_keys[k]=true
  k
end

#runstate_keysObject



978
979
980
981
# File 'lib/queris/query.rb', line 978

def runstate_keys
  @runstate_keys ||= {}
  @runstate_keys.keys
end

#sort(index, reverse = nil) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/queris/query.rb', line 141

def sort(index, reverse = nil)
  # accept a minus sign in front of index name to mean reverse
  @results_key = nil
  if Query === index
    raise ArgumentError, "Sort can't be extracted from queries over different models." unless index.model == model
    sort_query = index
    if sort_query.sort_ops.empty?
      index = nil
    else #copy sort from another query
      sort_query.sort_ops.each do |op|
        op.operands.each do |operand| 
          use_index operand.index unless Query === index
        end
      end
      self.sort_ops = sort_query.sort_ops.dup
      sort_query.sort false #unsort sorted query - legacy behavior, probably a bad idea in the long run
    end
  else
    if index.respond_to?('[]') 
      if index[0] == '-'
        reverse, index = true, index[1..-1]
      elsif index[0] == '+'
        reverse, index = false, index[1..-1]
      end
    end
    if index
      index = use_index index #accept string index names and indices and queries
      real_index = ForeignIndex === index ? index.real_index : index
      raise ArgumentError, "Must have a RangeIndex for sorting, found #{real_index.class.name}" unless RangeIndex === real_index
      self.sort_ops.clear << SortOp.new.push(index, reverse)
    else
      self.sort_ops.clear
    end
  end
  use_page make_page
  self
end

#sort_multObject

sort multiplier (direction) – currently, +1 or -1



227
228
229
230
# File 'lib/queris/query.rb', line 227

def sort_mult #sort multiplier (direction) -- currently, +1 or -1
  return 1 if sort_ops.empty?
  sort_ops.first.operands.first.value
end

#sort_score(obj, arg = {}) ⇒ Object

get an object’s sorting score, or its previous sorting score if asked for



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/queris/query.rb', line 205

def sort_score(obj, arg={})
  score = 0
  sort_ops.each do |op|
    op.operands.each do |o|
      if arg[:previous]
        val = obj.send "#{o.index.attribute}_was"
      else
        val = obj.send o.index.attribute
      end
      
      score += o.value * (val || 0).to_f
    end
  end
  score
end

#sortby(index, direction = 1) ⇒ Object

SortOp::SYMBOL



178
179
180
# File 'lib/queris/query.rb', line 178

def sortby(index, direction = 1) #SortOp::SYMBOL
  sort index, direction == -1
end

#sorting_byObject



221
222
223
224
225
226
# File 'lib/queris/query.rb', line 221

def sorting_by
  sorting = sort_ops.map do |op|
    op.operands.map{|o| "#{(o.value < 0) ? '-' : ''}#{o.index.name}" }.join('+')
  end.join('+')
  sorting.empty? ? nil : sorting.to_sym
end

#sorting_by?(index) ⇒ Boolean

Returns:

  • (Boolean)


182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/queris/query.rb', line 182

def sorting_by? index
  val = 1
  if index.respond_to?("[]") && !(Index === index) then
    if index[0]=='-' then
      index, val = index[1..-1], -1
    end
  end
  begin
    index=@model.redis_index(index)
  rescue
    index = nil
  end
  if index
    sort_ops.each do |op|
      op.operands.each do |o|
        return true if o.index == index && o.value == val
      end
    end
  end
  nil
end

#static!Object



245
246
247
# File 'lib/queris/query.rb', line 245

def static! 
  @live=false; @realtime=false; self; 
end

#static?Boolean

static queries are updated only after they expire

Returns:

  • (Boolean)


244
# File 'lib/queris/query.rb', line 244

def static?; !live!; end

#structureObject



840
841
842
# File 'lib/queris/query.rb', line 840

def structure
  explain :structure => true
end

#subquery(arg = {}) ⇒ Object



779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
# File 'lib/queris/query.rb', line 779

def subquery arg={}
  if arg.kind_of? Query #adopt a given query as subquery
    raise Error, "Trying to use a subquery from a different model" unless arg.model == model
  else #create new subquery
    arg[:model]=model
  end
  @used_subquery ||= {}
  @results_key = nil
  if arg.kind_of? Query
    subq = arg
  else
    subq = self.class.new((arg[:model] or model), arg.merge(:complete_prefix => redis_prefix, :ttl => @ttl))
  end
  subq.use_redis redis
  unless @used_subquery[subq]
    @used_subquery[subq]=true
    @subqueries << subq
  end
  subq
end

#subquery_id(subquery) ⇒ Object



799
800
801
# File 'lib/queris/query.rb', line 799

def subquery_id(subquery)
  @subqueries.index subquery
end

#temp_keysObject



321
322
323
324
# File 'lib/queris/query.rb', line 321

def temp_keys
  @temp_keys ||= {}
  @temp_keys.keys
end

#to_sObject



836
837
838
# File 'lib/queris/query.rb', line 836

def to_s
  explain
end

#trace(opt = {}) ⇒ Object



414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/queris/query.rb', line 414

def trace(opt={})
  indent = opt[:indent] || 0
  buf = "#{"  " * indent}#{indent == 0 ? 'Query' : 'Subquery'} #{self}:\r\n"
  buf << "#{"  " * indent}key: #{key}\r\n"
  buf << "#{"  " * indent}ttl:#{ttl}, |#{redis.type(key)} results key|=#{count(:no_run => true)}\r\n"
  buf << "#{"  " * indent}trace:\r\n"
  case @trace
  when nil
    buf << "#{"  " * indent}No trace available, query hasn't been run yet."
  when false
    buf << "#{"  " * indent}No trace available, query was run without :trace parameter. Try query.run(:trace => true)"
  else
    buf << @trace.indent(indent).to_s
  end
  opt[:output]!=false ? puts(buf) : buf
end

#trace!(opt = {}) ⇒ Object



403
404
405
406
407
408
409
410
# File 'lib/queris/query.rb', line 403

def trace!(opt={})
  if opt==false
    @must_trace = false
  elsif opt
    @must_trace = opt
  end
  self
end

#trace?Boolean

Returns:

  • (Boolean)


411
412
413
# File 'lib/queris/query.rb', line 411

def trace?
  @must_trace || @trace
end

#undo(num_operations = 1) ⇒ Object

undo the last n operations

Raises:



128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/queris/query.rb', line 128

def undo (num_operations=1)
  return self if num_operations == 0 || ops.empty?
  @results_key = nil
  op = ops.last
  raise ClientError, "Unexpected operand-less query operation" unless (last_operand = op.operands.last)
  if Query === (sub = last_operand.index)
    subqueries.delete sub
  end
  op.operands.pop
  ops.pop if op.operands.empty?
  undo(num_operations - 1)
end

#union(index, val = nil) ⇒ Object Also known as:

the set operations



84
85
86
# File 'lib/queris/query.rb', line 84

def union(index, val=nil)
  prepare_op UnionOp, index, val
end

#unpageable!Object



683
684
685
# File 'lib/queris/query.rb', line 683

def unpageable!
  @pageable = false
end

#update(obj, arg = {}) ⇒ Object

update query results with object(s)



273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/queris/query.rb', line 273

def update(obj, arg={})
  if uses_index_as_results_key? # DISCUSS : query.union(subquery) won't be updated
    return true
  end
  obj_id = model === obj ? obj.id : obj  #BUG-IN-WAITING: HARDCODED id attribute
  myredis = arg[:redis] || redis_master || redis
  if arg[:delete]
    ret = myredis.zrem results_key, obj_id
  else
    ret = myredis.zadd results_key(:delta), 0, obj_id
  end
  ret
end

#usable_as_results?(*arg) ⇒ Boolean

Returns:

  • (Boolean)


302
303
304
# File 'lib/queris/query.rb', line 302

def usable_as_results?(*arg)
  true
end

#use_page(page) ⇒ Object

Raises:

  • (ArgumentError)


658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
# File 'lib/queris/query.rb', line 658

def use_page(page)
  @results_key=nil
  if !pageable?
    return nil
  end
  raise ArgumentError, "must be a Query::Page" unless page.nil? || Page === page
  if block_given?
    temp=@page
    use_page page
    yield
    use_page temp
  else
    @page=page
    subqueries.each {|s| s.use_page @page}
  end
end

#use_redis(redis_instance) ⇒ Object

seems obsolete with the new run pipeline



72
73
74
75
76
# File 'lib/queris/query.rb', line 72

def use_redis(redis_instance) #seems obsolete with the new run pipeline
  @redis = redis_instance
  subqueries.each {|sub| sub.use_redis redis_instance}
  self
end

#uses_index?(*arg) ⇒ Boolean

Returns:

  • (Boolean)


431
432
433
434
435
436
437
# File 'lib/queris/query.rb', line 431

def uses_index?(*arg)
  arg.each do |ind|
    index_name = Queris::Index === ind ? ind.name : ind.to_sym
    return true if @used_index[index_name]
  end
  false
end

#uses_index_as_results_key?Boolean

Returns:

  • (Boolean)


291
292
293
294
295
296
297
298
299
300
# File 'lib/queris/query.rb', line 291

def uses_index_as_results_key?
  if ops.length == 1 && sort_ops.empty? && ops.first.operands.length == 1
    first_op = ops.first.operands.first
    first_index = first_op.index
    if first_index.usable_as_results? first_op.value
      return first_index.key first_op.value
    end
  end
  nil
end

#volatile_query_keysObject

a list of all keys that need to be refreshed (ttl extended) for a query



307
308
309
310
311
312
313
314
# File 'lib/queris/query.rb', line 307

def volatile_query_keys
  #first element MUST be the existence flag key
  #second element MUST be the results key
  volatile = [ results_key(:exists), results_key ]
  volatile |=[ results_key(:live), results_key(:marshaled) ] if live?
  volatile |= @page.volatile_query_keys(self) if paged?
  volatile
end