Class: Rhoconnect::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/rhoconnect/store.rb

Constant Summary collapse

RESERVED_ATTRIB_NAMES =
["attrib_type", "id"]
@@db =
nil

Class Method Summary collapse

Class Method Details

.clone(srckey, dstkey) ⇒ Object

Create a copy of srckey in dstkey



411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/rhoconnect/store.rb', line 411

def clone(srckey,dstkey)
  buckets = _get_bucket_indices(srckey)
  if buckets.size
    @@db.pipelined do
      buckets.each do |bucket_index|
        _add_bucket_index(dstkey, bucket_index)
        @@db.sdiffstore("#{dstkey}:#{bucket_index}", "#{srckey}:#{bucket_index}", '')
      end
    end
  else
    @@db.sdiffstore(dstkey,srckey,'')
  end
end

.create(server = nil) ⇒ Object



20
21
22
23
24
# File 'lib/rhoconnect/store.rb', line 20

def create(server=nil)
  @@db ||= _get_redis(server)
  raise "Error connecting to Redis store." unless @@db and 
    (@@db.is_a?(Redis) or @@db.is_a?(Redis::Client) or @@db.is_a?(ConnectionPool::Wrapper))
end

.dbObject



14
# File 'lib/rhoconnect/store.rb', line 14

def db; @@db || @@db = _get_redis end

.db=(server = nil) ⇒ Object



16
17
18
# File 'lib/rhoconnect/store.rb', line 16

def db=(server=nil)
  @@db = _get_redis(server)
end

.decr(dockey) ⇒ Object



173
174
175
# File 'lib/rhoconnect/store.rb', line 173

def decr(dockey)
  @@db.decr(dockey)
end

.delete_data(dockey, data = {}) ⇒ Object

Deletes data from a given doctype,source,user



146
147
148
149
150
151
# File 'lib/rhoconnect/store.rb', line 146

def delete_data(dockey,data={})
  if dockey and data
    _delete_objects(dockey, data)
  end
  true
end

.delete_objects(dockey, data = []) ⇒ Object

Removes objects from a given doctype,source,user



138
139
140
141
142
143
# File 'lib/rhoconnect/store.rb', line 138

def delete_objects(dockey,data=[])
  return 0 unless dockey and data
  
  objs = get_objects(dockey, data)
  _delete_objects(dockey, objs)
end

.doc_type(dockey) ⇒ Object



34
35
36
# File 'lib/rhoconnect/store.rb', line 34

def doc_type(dockey)
  @@db.type(dockey) if dockey
end

.execute_transactionObject



30
31
32
# File 'lib/rhoconnect/store.rb', line 30

def execute_transaction
  @@db.exec
end

.exists?(key) ⇒ Boolean

Returns:

  • (Boolean)


543
544
545
# File 'lib/rhoconnect/store.rb', line 543

def exists?(key)
  @@db.exists(key) || @@db.exists("#{key}:indices")
end

.flash_data(keymask) ⇒ Object

Deletes all keys matching a given mask



337
338
339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/rhoconnect/store.rb', line 337

def flash_data(keymask)
  if keymask.to_s[/[*\[\]?]/]
    # If the keymask contains any pattern matching characters
    # Use keys command to find all keys matching pattern (this is extremely expensive)
    # Then delete matches
    @@db.keys(keymask).each do |key|
      _delete_doc(key)
    end
  else
    # The keymask doesn't contain pattern matching characters
    # A delete call is all that is needed
    _delete_doc(keymask)
  end
end

.flush_multi_zdata(dockey) ⇒ Object

Deletes all keys and their hashes from the Redis DB



532
533
534
535
536
537
538
539
540
541
# File 'lib/rhoconnect/store.rb', line 532

def flush_multi_zdata(dockey)
  data = Store.db.zrange(dockey, 0, -1)
  data.each do |zsetkey|
    scored_assoc_key,source_key,operation = getelement(zsetkey)
    score,assoc_key = scored_assoc_key.split(',')
    _delete_doc("#{dockey}:#{zsetkey}")
    _delete_doc("#{dockey}:#{score}:#{assoc_key}:sources")
  end
  Store.db.zremrangebyrank(dockey, 0, -1)
end

.flush_zdata(dockey) ⇒ Object

Deletes all keys and their hashes from the Redis DB



479
480
481
482
483
484
485
# File 'lib/rhoconnect/store.rb', line 479

def flush_zdata(dockey)
  data = Store.db.zrange(dockey, 0, -1)
  data.each do |hash_key|
    _delete_doc("#{dockey}:#{hash_key}")
  end
  Store.db.zremrangebyrank(dockey, 0, -1)
end

.get_data(dockey, type = Hash) ⇒ Object

Retrieves set for given dockey,source,user



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/rhoconnect/store.rb', line 191

def get_data(dockey,type=Hash)
  res = type == Hash ? {} : []
  if dockey
    if type == Hash
      buckets = _get_buckets(dockey)
      members = @@db.pipelined do
        buckets.each do |bucket|
          @@db.smembers(bucket)
        end if buckets
      end
      members.each do |elements|
        elements.each do |element|
          key,obj = get_obj_element(element)
          res[key] = obj
          #res[key].merge!({attrib => value})
        end if elements
      end if members
    else
      res = get_list(dockey)
    end
  end
  res
end

.get_db_doc(dockey) ⇒ Object



46
47
48
49
50
51
52
53
54
55
# File 'lib/rhoconnect/store.rb', line 46

def get_db_doc(dockey)
  doctype = Store.doc_type(dockey)
  if doctype == 'string'
    Store.get_value(dockey)
  elsif doctype == 'list'
    Store.get_data(dockey, Array).to_json
  else
    Store.get_data(dockey).to_json
  end
end

.get_diff_data(src_dockey, dst_dockey, p_size = nil) ⇒ Object

Retrieves diff data hash between two sets each entry is in the form of DIFF_OBJ_ELEMENT => [OBJ_KEY, OBJ_DATA_PAIRS]



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/rhoconnect/store.rb', line 225

def get_diff_data(src_dockey,dst_dockey,p_size=nil)
  res = {}
  if src_dockey and dst_dockey
    # obtain combined indices
    indices = @@db.hgetall("#{dst_dockey}:indices")
    indices.keys.each do |index|
      dst_bucket_name = "#{dst_dockey}:#{index}"
      src_bucket_name = "#{src_dockey}:#{index}"
      diff_elements =  @@db.sdiff(dst_bucket_name,src_bucket_name)
      diff_elements.each do |element|
        keypairs = get_obj_key_and_pairs(element)
        next unless keypairs
        res[element] = keypairs
        return res if p_size and (res.size >= p_size)
      end
    end
  end
  res
end

.get_inserts_deletes(inserts_elements_map, deletes_elements_map) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/rhoconnect/store.rb', line 245

def get_inserts_deletes(inserts_elements_map, deletes_elements_map)
  inserts_obj_hash = {}
  inserts_elements_map.each do |element,keypairs|
    key,obj_pairs = keypairs[0],keypairs[1]
    next unless (key and obj_pairs)
    inserts_obj_hash[key] = Set.new(obj_pairs)
  end
  
  deletes_obj_hash = {}
  deletes_elements_map.each do |element,keypairs|
    key,obj_pairs = keypairs[0],keypairs[1]
    next unless (key and obj_pairs)
    deletes_obj_hash[key] = Set.new(obj_pairs)
  end
  # modified attributes
  inserts = {}
  deletes = {}
  
  inserts_obj_hash.each do |key, obj_set|
    deletes_pairs = nil
    inserts_pairs = nil
    if deletes_obj_hash.has_key?(key)
      deletes_pairs = deletes_obj_hash[key].dup.subtract(obj_set).to_a
      inserts_pairs = obj_set.dup.subtract(deletes_obj_hash[key]).to_a
      # remove the key from the deletes set - we already processed it
      deletes_obj_hash.delete(key)
    else
      # if object is not in the deletes set - then, it's all inserts
      inserts_pairs = obj_set.to_a
    end
    # split resulting pairs
    if inserts_pairs and inserts_pairs.size > 0
      inserts[key] = split_obj_pairs(inserts_pairs)
    end
    if deletes_pairs and deletes_pairs.size > 0
      deletes[key] = split_obj_pairs(deletes_pairs)
    end
  end
  # after we analyzed the inserts__obj_hash
  # => deletes_obj_hash should contain only the unmatched deletes
  deletes_obj_hash.each do |key, obj_set|
    if obj_set.size > 0
      deletes[key] = split_obj_pairs(obj_set.to_a)
    end
  end
  
  [inserts, deletes]   
end

.get_list(dockey) ⇒ Object



215
216
217
218
219
220
221
# File 'lib/rhoconnect/store.rb', line 215

def get_list(dockey)
  res = []
  if dockey
    res = @@db.lrange(dockey, 0, -1)
  end
  res
end

.get_lock(dockey, timeout = 0, raise_on_expire = false) ⇒ Object



360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/rhoconnect/store.rb', line 360

def get_lock(dockey,timeout=0,raise_on_expire=false)
  lock_key = _lock_key(dockey)
  current_time = Time.now.to_i   
  ts = current_time+(Rhoconnect.lock_duration || timeout)+1
  loop do 
    if not @@db.setnx(lock_key,ts)
      current_lock = @@db.get(lock_key)
      # ensure lock wasn't released between the setnx and get calls
      if current_lock
       	current_lock_timeout = current_lock.to_i
       	if raise_on_expire or Rhoconnect.raise_on_expired_lock
       	  if current_lock_timeout <= current_time
       	    # lock expired before operation which set it up completed
       	    # this process cannot continue without corrupting locked data 
       	    raise StoreLockException, "Lock \"#{lock_key}\" expired before it was released"
       	  end
       	else  
       	  if current_lock_timeout <= current_time and 
       	    @@db.getset(lock_key,ts).to_i <= current_time
       	    # previous lock expired and we replaced it with our own
       	    break
       	  end
     	  end
   	  # lock was released between setnx and get - try to acquire it again
 	    elsif @@db.setnx(lock_key,ts)
   	    break
      end
      sleep(1)
      current_time = Time.now.to_i
    else
      break #no lock was set, so we set ours and leaving
    end
  end
  return ts
end

.get_multi_zdata(dockey) ⇒ Object



506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
# File 'lib/rhoconnect/store.rb', line 506

def get_multi_zdata(dockey)
  data = Store.db.zrange(dockey, 0, -1)
  ret = []
  sources = []
  keys = []
  unless data.nil?
    scores = []
    data.each do |zsetkey|
      scored_assoc_key,source_key,operation = getelement(zsetkey)
      score,assoc_key = scored_assoc_key.split(',')
      obj_hash = Store.get_data "#{dockey}:#{zsetkey}"
      if scores[-1] != score.to_i
        sources << Store.get_data("#{dockey}:#{score}:#{assoc_key}:sources", Array)
        ret << {source_key => {operation => obj_hash}}
        scores << score.to_i
        keys << assoc_key
      else
        ret[-1][source_key] ||= {}
        ret[-1][source_key].merge!({operation => obj_hash})
      end
    end
  end
  [ret, sources, keys]
end

.get_object(dockey, key) ⇒ Object



181
182
183
184
# File 'lib/rhoconnect/store.rb', line 181

def get_object(dockey, key)
  res = _get_objects(dockey, [key])
  (res and res.size > 0) ? res.values[0] : nil
end

.get_objects(dockey, keys) ⇒ Object



186
187
188
# File 'lib/rhoconnect/store.rb', line 186

def get_objects(dockey, keys)
  _get_objects(dockey, keys)
end

.get_value(dockey) ⇒ Object

Retrieves value for a given key



165
166
167
# File 'lib/rhoconnect/store.rb', line 165

def get_value(dockey)
  @@db.get(dockey) if dockey
end

.get_zdata(dockey) ⇒ Object

Retrieves set for given dockey,associated key (client_id), obj_hashes



457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
# File 'lib/rhoconnect/store.rb', line 457

def get_zdata(dockey)
  data = Store.db.zrange(dockey, 0, -1)
  ret = []
  keys = []
  unless data.nil?
    scores = []
    data.each do |zsetkey|
      obj_hash = Store.get_data "#{dockey}:#{zsetkey}"
      score,key,objkey = getelement(zsetkey)
      if scores[-1] != score
        ret << obj_hash
        keys << key
        scores << score
      else
        ret[-1].merge!(obj_hash)
      end
    end
  end
  [ret, keys]
end

.incr(dockey) ⇒ Object



169
170
171
# File 'lib/rhoconnect/store.rb', line 169

def incr(dockey)
  @@db.incr(dockey)
end

.lock(dockey, timeout = 0, raise_on_expire = false) ⇒ Object

Lock a given key and release when provided block is finished



353
354
355
356
357
358
# File 'lib/rhoconnect/store.rb', line 353

def lock(dockey,timeout=0,raise_on_expire=false)
  m_lock = get_lock(dockey,timeout,raise_on_expire)
  res = yield
  release_lock(dockey,m_lock)
  res
end

.put_data(dockey, data = {}, append = false) ⇒ Object Also known as: set_data

Adds set with given data, replaces existing set if it exists or appends data to the existing set if append flag set to true



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/rhoconnect/store.rb', line 64

def put_data(dockey,data={},append=false)
  if dockey and data
    flash_data(dockey) unless append
    # Inserts a hash or array
    if data.is_a?Hash
      _put_objects(dockey, data)
    else
      put_list(dockey,data,append)
    end
  end
  true
end

.put_list(dockey, data = [], append = false) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/rhoconnect/store.rb', line 77

def put_list(dockey, data=[], append=false)
  if dockey and data
    flash_data(dockey) unless append
    @@db.pipelined do
      data.each do |element|
        @@db.rpush(dockey, element)
      end
    end
  end
  true
end

.put_multi_zdata(dockey, assoc_key, sources = [], data = {}, append = false) ⇒ Object

these methods should be used for transactional, multi-source CUD queue requests



488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
# File 'lib/rhoconnect/store.rb', line 488

def put_multi_zdata(dockey,assoc_key,sources=[],data={},append=false)
  return true unless (dockey and assoc_key and sources and data)
  flush_multi_zdata(dockey) unless append
  current_score = 0
  current_score_data = Store.db.zrevrange(dockey,0,0,:with_scores => true)
  current_score = current_score_data[-1][1].to_i if current_score_data and current_score_data[-1]
  current_score += 1
  Store.put_data("#{dockey}:#{current_score.to_i}:#{assoc_key}:sources", sources)
  data.each do |source_key,source_hashes|
    source_hashes.each do |operation,obj_hashes|
      unique_zrecord_key = setelement("#{current_score.to_i},#{assoc_key}",source_key,operation)
      Store.db.zadd(dockey, current_score.to_i, unique_zrecord_key)
      Store.put_data("#{dockey}:#{unique_zrecord_key}", obj_hashes)
   	end
  end
  true
end

.put_object(dockey, key, data = {}) ⇒ Object



57
58
59
# File 'lib/rhoconnect/store.rb', line 57

def put_object(dockey, key, data={})
  _put_objects(dockey, {key => data})
end

.put_value(dockey, value) ⇒ Object Also known as: set_value

Adds a simple key/value pair



154
155
156
157
158
159
160
161
162
# File 'lib/rhoconnect/store.rb', line 154

def put_value(dockey,value)
  if dockey
    if value	
      @@db.set(dockey,value.to_s)
    else
      @@db.del(dockey)	
    end
  end
end

.put_zdata(dockey, assoc_key, data = {}, append = false) ⇒ Object



441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/rhoconnect/store.rb', line 441

def put_zdata(dockey,assoc_key,data={},append=false)
  return true unless (dockey and assoc_key and data)
  flush_zdata(dockey) unless append
  current_score = 0
  current_score_data = Store.db.zrevrange(dockey,0,0,:with_scores => true)
  current_score = current_score_data[-1][1].to_i if current_score_data and current_score_data[-1]
  current_score += 1
  data.each do |key,hash_value|
    unique_record_key = setelement(current_score,assoc_key, key)
    Store.db.zadd(dockey, current_score, unique_record_key)
   	Store.put_data("#{dockey}:#{unique_record_key}",{key => hash_value})
  end
  true
end

.release_lock(dockey, lock, raise_on_expire = false) ⇒ Object

Due to redis bug #140, setnx always returns true so this doesn’t work def get_lock(dockey,timeout=0)

lock_key = _lock_key(dockey)
until @@db.setnx(lock_key,1) do 
  sleep(1) 
end
@@db.expire(lock_key,timeout+1)
Time.now.to_i+timeout+1

end



406
407
408
# File 'lib/rhoconnect/store.rb', line 406

def release_lock(dockey,lock,raise_on_expire=false)
  @@db.del(_lock_key(dockey)) if raise_on_expire or Rhoconnect.raise_on_expired_lock or (lock >= Time.now.to_i)
end

.rename(srckey, dstkey) ⇒ Object

Rename srckey to dstkey



426
427
428
429
430
431
432
433
434
435
436
437
438
439
# File 'lib/rhoconnect/store.rb', line 426

def rename(srckey,dstkey)
  buckets = _get_bucket_indices(srckey)
  if buckets.size
    @@db.pipelined do
      @@db.del("#{srckey}:indices")
      buckets.each do |bucket_index|
        _add_bucket_index(dstkey, bucket_index)
        @@db.rename("#{srckey}:#{bucket_index}", "#{dstkey}:#{bucket_index}")
      end
    end
  else
    @@db.rename(srckey,dstkey) if @@db.exists(srckey)
  end
end

.set_db_doc(dockey, data, append = false) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/rhoconnect/store.rb', line 38

def set_db_doc(dockey, data, append=false)
  if data.is_a?(String)
    put_value(dockey, data)
  else
    put_data(dockey, data, append)
  end
end

.start_transactionObject



26
27
28
# File 'lib/rhoconnect/store.rb', line 26

def start_transaction
  @@db.multi
end

.update_count(dockey, count) ⇒ Object



177
178
179
# File 'lib/rhoconnect/store.rb', line 177

def update_count(dockey, count)
  Store.db.incrby(dockey, count)
end

.update_elements(dockey, inserts_elements_map, deletes_elements_map) ⇒ Object



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/rhoconnect/store.rb', line 294

def update_elements(dockey, inserts_elements_map, deletes_elements_map)
  indices_to_cleanup = Set.new
  @@db.pipelined do
    collected_adds = {}
    collected_rems = {}
    
    inserts_elements_map.each do |element,keypairs|
      key = keypairs[0]
      next if not key or not element or element.size == 0
    
      obj_bucket_index = _create_obj_index(key)
      bucket_name = "#{dockey}:#{obj_bucket_index}"
      _add_bucket_index(dockey, obj_bucket_index)
      
      collected_adds[bucket_name] ||= []
      collected_adds[bucket_name] << element
    end
    
    deletes_elements_map.each do |element,keypairs|
      key = keypairs[0]
      next if not key or not element or element.size == 0
    
      obj_bucket_index = _create_obj_index(key)
      bucket_name = "#{dockey}:#{obj_bucket_index}"
      indices_to_cleanup << bucket_name
      
      collected_rems[bucket_name] ||= []
      collected_rems[bucket_name] << element
    end
    
    # now, perform SREM first, then SADD
    collected_rems.each do |bucket, bucket_data|
      @@db.srem(bucket, bucket_data)
    end
    collected_adds.each do |bucket,bucket_data|
      @@db.sadd(bucket, bucket_data)
    end
  end 
  # now, cleanup buckets if necessary
  _cleanup_buckets(dockey, indices_to_cleanup.to_a)
end

.update_objects(dockey, data = {}) ⇒ Object

updates objects for a given doctype, source, user create new objects if necessary



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/rhoconnect/store.rb', line 91

def update_objects(dockey, data={})
  return 0 unless dockey and data
  
  new_object_count = 0
  objs = get_objects(dockey, data.keys) || {}
  
  collected_adds = {}
  collected_rems = {}
  my_bucket = nil
  @@db.pipelined do
    data.each do |key,obj|
      is_create = objs[key].nil?
      new_object_count += 1 if is_create
      obj_bucket = _add_bucket_index(dockey, "#{_create_obj_index(key)}")
      
      # collect SREM (if object exists in DB)
      unless is_create
        old_element = set_obj_element(key,objs[key])
        collected_rems[obj_bucket] ||= []
        collected_rems[obj_bucket] << old_element
      end
      # update the object and collect SADD
      objs[key] ||= {}
      objs[key].merge!(obj)
      
      new_element = set_obj_element(key,objs[key])
      collected_adds[obj_bucket] ||= []
      collected_adds[obj_bucket] << new_element
    end
    # process all SADD and SREM commands as one
    # SREM must go first
    collected_rems.each do |bucket, bucket_data|
      @@db.srem(bucket, bucket_data)
    end
    collected_adds.each do |bucket, bucket_data|
      @@db.sadd(bucket, bucket_data)
    end
  end
  
  
  #data1 = @@db.smembers(my_bucket)
  #puts "data1 is #{data1.inspect}"
  
  new_object_count
end