Class: Rhoconnect::Source

Inherits:
MemoryOrm show all
Includes:
Document, LockOps
Defined in:
lib/rhoconnect/source.rb

Instance Attribute Summary collapse

Attributes inherited from MemoryOrm

#id

Class Method Summary collapse

Instance Method Summary collapse

Methods included from LockOps

#lock

Methods included from Document

#clone, #compute_store_index, #compute_token, #delete_data, #docname, #exists?, #flush_data, #get_data, #get_db_doc, #get_diff_data, #get_diff_data_bruteforce, #get_list, #get_object, #get_objects, #get_value, included, #put_data, #put_list, #put_object, #put_tmp_data, #put_value, #remove_objects, #rename, #rename_tmp_data, #set_db_doc, #update_count, #update_elements, #update_objects, #verify_doctype

Methods inherited from MemoryOrm

class_prefix, define_fields, is_exist?, #to_array, #update_fields, validates_presence_of

Constructor Details

#initialize(fields) ⇒ Source

Returns a new instance of Source.



122
123
124
125
# File 'lib/rhoconnect/source.rb', line 122

def initialize(fields)
  self.name = fields['name'] || fields[:name]
  update_fields(fields)
end

Instance Attribute Details

#app_idObject

Returns the value of attribute app_id.



102
103
104
# File 'lib/rhoconnect/source.rb', line 102

def app_id
  @app_id
end

#user_idObject

Returns the value of attribute user_id.



102
103
104
# File 'lib/rhoconnect/source.rb', line 102

def user_id
  @user_id
end

Class Method Details

.create(fields, params) ⇒ Object



155
156
157
158
159
160
161
162
163
# File 'lib/rhoconnect/source.rb', line 155

def self.create(fields,params)
  fields = fields.with_indifferent_access # so we can access hash keys as symbols
  super(fields,params)
  @@model_data[fields[:name].to_sym] = {}
  set_defaults(fields)
  obj = new(fields)
  obj.assign_args(params)
  obj
end

.delete_allObject



205
206
207
208
209
210
211
212
213
# File 'lib/rhoconnect/source.rb', line 205

def self.delete_all
  params = {:app_id => APP_NAME,:user_id => '*'}
  @@model_data.each do |k,v|
    s = Source.load(k,params)
    s.flush_store_data
    Store.flush_data("source:#{s.name}:*")
  end
  @@model_data = {}
end

.load(obj_id, params) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/rhoconnect/source.rb', line 165

def self.load(obj_id,params)
  validate_attributes(params)

  # if source is pre-defined
  # create it dynamically here
  Rhoconnect.create_predefined_source(obj_id,params)

  model_hash = @@model_data[obj_id.to_sym]

  obj = new(model_hash) if model_hash
  if obj
    obj = obj.dup
    obj.assign_args(params)
  end
  obj
end

.set_defaults(fields) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/rhoconnect/source.rb', line 132

def self.set_defaults(fields)
  fields[:url] ||= ''
  fields[:login] ||= ''
  fields[:password] ||= ''
  fields[:priority] ||= 3
  fields[:partition_type] = fields[:partition_type] ? fields[:partition_type].to_sym : :user
  fields[:poll_interval] ||= 300
  fields[:sync_type] = fields[:sync_type] ? fields[:sync_type].to_sym : :incremental
  fields[:id] = fields[:name]
  fields[:rho__id] = fields[:name]
  fields[:belongs_to] = fields[:belongs_to].to_json if fields[:belongs_to]
  fields[:schema] = fields[:schema].to_json if fields[:schema]
  fields[:retry_limit] = fields[:retry_limit] ? fields[:retry_limit] : 0
  fields[:simulate_time] = fields[:simulate_time] ? fields[:simulate_time] : 0
  fields[:push_notify] = fields[:push_notify] ? fields[:push_notify] : 'false'
end

.update_associations(sources) ⇒ Object



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/rhoconnect/source.rb', line 182

def self.update_associations(sources)
  params = {:app_id => APP_NAME,:user_id => '*'}
  sources.each { |source| Source.load(source, params).has_many = nil }
  sources.each do |source|
    s = Source.load(source, params)
    if s.belongs_to
      belongs_to = JSON.parse(s.belongs_to)
      if belongs_to.is_a?(Array)
        belongs_to.each do |entry|
          attrib = entry.keys[0]
          model = entry[attrib]
          owner = Source.load(model, params)
          owner.has_many ||= ''
          owner.has_many = owner.has_many+',' if owner.has_many.length > 0
          owner.has_many += [source,attrib].join(',')
        end
      else
        log "WARNING: Incorrect belongs_to format for #{source}, belongs_to should be an array."
      end
    end
  end
end

Instance Method Details

#announce_changesObject



435
436
437
438
439
440
441
442
# File 'lib/rhoconnect/source.rb', line 435

def announce_changes
  return unless push_notify?
  # TODO: currently we're not allowing 'Broadcast' push to all users for :app partitioned sources
  return if self.partition.to_sym == :app

  users = [self.user_id]
  User.ping({'user_id' => users, 'sources' => [self.name]})
end

#appObject

Return the app the source belongs to



255
256
257
# File 'lib/rhoconnect/source.rb', line 255

def app
  App.load(self.app_id)
end

#assign_args(params) ⇒ Object



215
216
217
218
# File 'lib/rhoconnect/source.rb', line 215

def assign_args(params)
  self.user_id = params[:user_id]
  self.app_id = params[:app_id]
end

#blob_attribsObject



220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/rhoconnect/source.rb', line 220

def blob_attribs
  return '' unless self.schema
  schema = JSON.parse(self.schema)
  blob_attribs = []
  schema['property'].each do |key,value|
    values = value ? value.split(',') : []
    if values.include?('blob')
      attrib = key.dup
      attrib << "," + (values.include?('overwrite') ? '1' : '0')
      blob_attribs << attrib
    end
  end
  blob_attribs.sort.join(',')
end

#check_refresh_timeObject



377
378
379
380
# File 'lib/rhoconnect/source.rb', line 377

def check_refresh_time
  self.poll_interval == 0 or
  (self.poll_interval != -1 and self.read_state.refresh_time <= Time.now.to_i)
end

#deleteObject



346
347
348
349
# File 'lib/rhoconnect/source.rb', line 346

def delete
  flush_store_data
  @@model_data.delete(rho__id.to_sym) if rho__id
end

#delete_user_read_stateObject



275
276
277
278
279
# File 'lib/rhoconnect/source.rb', line 275

def delete_user_read_state
  id = {:app_id => self.app_id,:user_id => user_by_partition,
    :source_name => self.name}
  ReadState.delete_user(id)
end

#doc_suffix(doctype) ⇒ Object



281
282
283
# File 'lib/rhoconnect/source.rb', line 281

def doc_suffix(doctype)
  "#{user_by_partition}:#{self.name}:#{doctype.to_s}"
end

#flush_queue(doctype) ⇒ Object



332
333
334
335
336
337
# File 'lib/rhoconnect/source.rb', line 332

def flush_queue(doctype)
  verify_doctype(doctype)
  lock_queue_doc(doctype) do |s|
    Store.get_store(0).flush_zdata(s.queue_docname(doctype))
  end
end

#flush_store_dataObject



285
286
287
288
289
290
291
292
293
294
295
# File 'lib/rhoconnect/source.rb', line 285

def flush_store_data
  delete_user_read_state
  self.class.valid_doctypes.each do |docname, doctype|
    case doctype
    when :queue
      flush_queue(docname)
    when :document
      flush_data(docname)
    end
  end
end

#get_queue(doctype) ⇒ Object



322
323
324
325
326
327
328
329
330
# File 'lib/rhoconnect/source.rb', line 322

def get_queue(doctype)
  verify_doctype(doctype)
  ret = []
  keys = []
  lock_queue_doc(doctype) do |s|
    ret, keys = Store.get_store(0).get_zdata(s.queue_docname(doctype))
  end
  [ret, keys]
end

#if_need_refresh(client_id = nil, params = nil) {|client_id, params| ... } ⇒ Object

Yields:

  • (client_id, params)


382
383
384
385
386
387
388
389
390
# File 'lib/rhoconnect/source.rb', line 382

def if_need_refresh(client_id=nil,params=nil)
  need_refresh = lock(:md) do |s|
    check = check_refresh_time
    self.read_state.prev_refresh_time = self.read_state.refresh_time if check
    self.read_state.refresh_time = Time.now.to_i + self.poll_interval if check
    check
  end
  yield client_id,params if need_refresh
end

#is_pass_through?Boolean

Returns:

  • (Boolean)


427
428
429
# File 'lib/rhoconnect/source.rb', line 427

def is_pass_through?
  self.pass_through and self.pass_through.to_s == 'true'
end

#load_read_stateObject



269
270
271
272
273
# File 'lib/rhoconnect/source.rb', line 269

def load_read_state
  id = {:app_id => self.app_id,:user_id => user_by_partition,
    :source_name => self.name}
  ReadState.load(id)
end

#lock_queue_doc(doctype) ⇒ Object

this data is not sharded



303
304
305
306
307
# File 'lib/rhoconnect/source.rb', line 303

def lock_queue_doc(doctype)
  Store.lock(queue_docname(doctype)) do
    yield self
  end
end

#partitionObject



351
352
353
# File 'lib/rhoconnect/source.rb', line 351

def partition
  self.partition_type.to_sym
end

#partition=(value) ⇒ Object



355
356
357
# File 'lib/rhoconnect/source.rb', line 355

def partition=(value)
  self.partition_type = value
end

#partition_nameObject



363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/rhoconnect/source.rb', line 363

def partition_name
  # edge case - used in deleting documents
  return self.user_id if self.user_id == '*'
  # default is user_id
  pname = self.user_id
  begin
    model_klass = Rhoconnect::Model::Base.load_source_model(self)
    pname = model_klass ? model_klass.partition_name(self.user_id) : self.user_id
  # eat the exception here
  rescue Exception #=> e
  end
  pname
end

#poll_intervalObject



240
241
242
243
# File 'lib/rhoconnect/source.rb', line 240

def poll_interval
  value = Store.get_value(poll_interval_key)
  value ? value.to_i : nil
end

#poll_interval=(interval) ⇒ Object



245
246
247
# File 'lib/rhoconnect/source.rb', line 245

def poll_interval=(interval)
  Store.put_value(poll_interval_key, interval)
end

#process_queue(doctype) ⇒ Object

this is an atomic operation

  • lock queue, get queue data, flush queue, unlock queue



311
312
313
314
315
316
317
318
319
320
# File 'lib/rhoconnect/source.rb', line 311

def process_queue(doctype)
  verify_doctype(doctype)
  ret = []
  keys = []
  lock_queue_doc(doctype) do |s|
    ret, keys = Store.get_store(0).get_zdata(s.queue_docname(doctype))
    Store.get_store(0).flush_zdata(s.queue_docname(doctype))
  end
  [ret, keys]
end

#push_notify?Boolean

Returns:

  • (Boolean)


431
432
433
# File 'lib/rhoconnect/source.rb', line 431

def push_notify?
  self.push_notify and self.push_notify.to_s == 'true'
end

#push_queue(doctype, assoc_key, data = [], append = false) ⇒ Object



339
340
341
342
343
344
# File 'lib/rhoconnect/source.rb', line 339

def push_queue(doctype,assoc_key, data=[],append=false)
  verify_doctype(doctype)
  lock_queue_doc(doctype) do |s|
    Store.get_store(0).put_zdata(s.queue_docname(doctype),assoc_key, data,append)
  end
end

#queue_docname(dockey) ⇒ Object



297
298
299
300
# File 'lib/rhoconnect/source.rb', line 297

def queue_docname(dockey)
  # currently, all queues are bound by user - not shared
  "#{self.class.class_prefix(self.class)}:#{self.app_id}:#{self.name}:#{dockey.to_s}"
end

#read_stateObject



263
264
265
266
267
# File 'lib/rhoconnect/source.rb', line 263

def read_state
  id = {:app_id => self.app_id,:user_id => user_by_partition,
    :source_name => self.name}
  load_read_state || ReadState.create(id)
end

#rewind_refresh_time(query_failure) ⇒ Object



392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/rhoconnect/source.rb', line 392

def rewind_refresh_time(query_failure)
  return if self.poll_interval == 0
  lock(:md) do |s|
    rewind_time = false
    # reset number of retries
    # and prev_refresh_time on succesfull query
    # or if last refresh was more than 'poll_interval' time ago
    if not query_failure or ((Time.now.to_i - self.read_state.prev_refresh_time) >= self.poll_interval)
      # we need to reset the prev_refresh_time here
      # otherwise in case of expired poll interval
      # and repeating failures - it will reset the counter
      # on every error
      self.read_state.prev_refresh_time = Time.now.to_i
      self.read_state.retry_counter = 0
    end

    # rewind the refresh time on failure
    # if retry limit is not reached
    if query_failure
      if self.read_state.retry_counter < self.retry_limit
        self.read_state.increment!(:retry_counter)
        rewind_time = true
        # we have reached the limit - do not rewind the refresh time
        # and reset the counter
      else
        self.read_state.retry_counter = 0
      end
    end

    if rewind_time
      self.read_state.refresh_time = self.read_state.prev_refresh_time
    end
  end
end

#schemaObject



259
260
261
# File 'lib/rhoconnect/source.rb', line 259

def schema
  self.get_value(:schema)
end

#store_index(doctype) ⇒ Object



149
150
151
152
153
# File 'lib/rhoconnect/source.rb', line 149

def store_index(doctype)
  # app-partitioned sources go to 0
  # everything else if sharded
  compute_store_index(doctype, self, self.user_id)
end

#to_hashObject



127
128
129
130
# File 'lib/rhoconnect/source.rb', line 127

def to_hash
  res = super
  res.merge({:user_id=>self.user_id,:app_id=>self.app_id})
end

#update(fields) ⇒ Object



235
236
237
238
# File 'lib/rhoconnect/source.rb', line 235

def update(fields)
  fields = fields.with_indifferent_access # so we can access hash keys as symbols
  self.class.set_defaults(fields)
end

#userObject

Return the user associated with a source



250
251
252
# File 'lib/rhoconnect/source.rb', line 250

def user
  User.load(self.user_id)
end

#user_by_partitionObject



359
360
361
# File 'lib/rhoconnect/source.rb', line 359

def user_by_partition
  self.partition.to_sym == :user ? partition_name : '__shared__'
end