Class: Rhoconnect::Source

Inherits:
MemoryOrm show all
Includes:
Document, LockOps
Defined in:
lib/rhoconnect/source.rb

Instance Attribute Summary collapse

Attributes inherited from MemoryOrm

#id

Class Method Summary collapse

Instance Method Summary collapse

Methods included from LockOps

#lock

Methods included from Document

#clone, #compute_store_index, #compute_token, #delete_data, #docname, #exists?, #flush_data, #get_data, #get_db_doc, #get_diff_data, #get_diff_data_bruteforce, #get_list, #get_object, #get_objects, #get_value, included, #put_data, #put_list, #put_object, #put_tmp_data, #put_value, #remove_objects, #rename, #rename_tmp_data, #set_db_doc, #update_count, #update_elements, #update_objects, #verify_doctype

Methods inherited from MemoryOrm

class_prefix, define_fields, is_exist?, #to_array, #to_hash, #update_fields, validates_presence_of

Constructor Details

#initialize(fields) ⇒ Source

Returns a new instance of Source.



122
123
124
125
# File 'lib/rhoconnect/source.rb', line 122

def initialize(fields)
  self.name = fields['name'] || fields[:name]
  update_fields(fields)
end

Instance Attribute Details

#app_idObject

Returns the value of attribute app_id.



102
103
104
# File 'lib/rhoconnect/source.rb', line 102

def app_id
  @app_id
end

#user_idObject

Returns the value of attribute user_id.



102
103
104
# File 'lib/rhoconnect/source.rb', line 102

def user_id
  @user_id
end

Class Method Details

.create(fields, params) ⇒ Object



150
151
152
153
154
155
156
157
158
# File 'lib/rhoconnect/source.rb', line 150

def self.create(fields,params)
  fields = fields.with_indifferent_access # so we can access hash keys as symbols
  super(fields,params)
  @@model_data[fields[:name].to_sym] = {}
  set_defaults(fields)
  obj = new(fields)
  obj.assign_args(params)
  obj
end

.delete_allObject



200
201
202
203
204
205
206
207
208
# File 'lib/rhoconnect/source.rb', line 200

def self.delete_all
  params = {:app_id => APP_NAME,:user_id => '*'}
  @@model_data.each do |k,v|
    s = Source.load(k,params)
    s.flush_store_data
    Store.flush_data("source:#{s.name}:*")
  end
  @@model_data = {}
end

.load(obj_id, params) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/rhoconnect/source.rb', line 160

def self.load(obj_id,params)
  validate_attributes(params)

  # if source is pre-defined
  # create it dynamically here
  Rhoconnect.create_predefined_source(obj_id,params)

  model_hash = @@model_data[obj_id.to_sym]

  obj = new(model_hash) if model_hash
  if obj
    obj = obj.dup
    obj.assign_args(params)
  end
  obj
end

.set_defaults(fields) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/rhoconnect/source.rb', line 127

def self.set_defaults(fields)
  fields[:url] ||= ''
  fields[:login] ||= ''
  fields[:password] ||= ''
  fields[:priority] ||= 3
  fields[:partition_type] = fields[:partition_type] ? fields[:partition_type].to_sym : :user
  fields[:poll_interval] ||= 300
  fields[:sync_type] = fields[:sync_type] ? fields[:sync_type].to_sym : :incremental
  fields[:id] = fields[:name]
  fields[:rho__id] = fields[:name]
  fields[:belongs_to] = fields[:belongs_to].to_json if fields[:belongs_to]
  fields[:schema] = fields[:schema].to_json if fields[:schema]
  fields[:retry_limit] = fields[:retry_limit] ? fields[:retry_limit] : 0
  fields[:simulate_time] = fields[:simulate_time] ? fields[:simulate_time] : 0
  fields[:push_notify] = fields[:push_notify] ? fields[:push_notify] : 'false'
end

.update_associations(sources) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/rhoconnect/source.rb', line 177

def self.update_associations(sources)
  params = {:app_id => APP_NAME,:user_id => '*'}
  sources.each { |source| Source.load(source, params).has_many = nil }
  sources.each do |source|
    s = Source.load(source, params)
    if s.belongs_to
      belongs_to = JSON.parse(s.belongs_to)
      if belongs_to.is_a?(Array)
        belongs_to.each do |entry|
          attrib = entry.keys[0]
          model = entry[attrib]
          owner = Source.load(model, params)
          owner.has_many ||= ''
          owner.has_many = owner.has_many+',' if owner.has_many.length > 0
          owner.has_many += [source,attrib].join(',')
        end
      else
        log "WARNING: Incorrect belongs_to format for #{source}, belongs_to should be an array."
      end
    end
  end
end

Instance Method Details

#announce_changesObject



430
431
432
433
434
435
436
437
# File 'lib/rhoconnect/source.rb', line 430

def announce_changes
  return unless push_notify?
  # TODO: currently we're not allowing 'Broadcast' push to all users for :app partitioned sources
  return if self.partition.to_sym == :app

  users = [self.user_id]
  User.ping({'user_id' => users, 'sources' => [self.name]})
end

#appObject

Return the app the source belongs to



250
251
252
# File 'lib/rhoconnect/source.rb', line 250

def app
  App.load(self.app_id)
end

#assign_args(params) ⇒ Object



210
211
212
213
# File 'lib/rhoconnect/source.rb', line 210

def assign_args(params)
  self.user_id = params[:user_id]
  self.app_id = params[:app_id]
end

#blob_attribsObject



215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/rhoconnect/source.rb', line 215

def blob_attribs
  return '' unless self.schema
  schema = JSON.parse(self.schema)
  blob_attribs = []
  schema['property'].each do |key,value|
    values = value ? value.split(',') : []
    if values.include?('blob')
      attrib = key.dup
      attrib << "," + (values.include?('overwrite') ? '1' : '0')
      blob_attribs << attrib
    end
  end
  blob_attribs.sort.join(',')
end

#check_refresh_timeObject



372
373
374
375
# File 'lib/rhoconnect/source.rb', line 372

def check_refresh_time
  self.poll_interval == 0 or
  (self.poll_interval != -1 and self.read_state.refresh_time <= Time.now.to_i)
end

#deleteObject



341
342
343
344
# File 'lib/rhoconnect/source.rb', line 341

def delete
  flush_store_data
  @@model_data.delete(rho__id.to_sym) if rho__id
end

#delete_user_read_stateObject



270
271
272
273
274
# File 'lib/rhoconnect/source.rb', line 270

def delete_user_read_state
  id = {:app_id => self.app_id,:user_id => user_by_partition,
    :source_name => self.name}
  ReadState.delete_user(id)
end

#doc_suffix(doctype) ⇒ Object



276
277
278
# File 'lib/rhoconnect/source.rb', line 276

def doc_suffix(doctype)
  "#{user_by_partition}:#{self.name}:#{doctype.to_s}"
end

#flush_queue(doctype) ⇒ Object



327
328
329
330
331
332
# File 'lib/rhoconnect/source.rb', line 327

def flush_queue(doctype)
  verify_doctype(doctype)
  lock_queue_doc(doctype) do |s|
    Store.get_store(0).flush_zdata(s.queue_docname(doctype))
  end
end

#flush_store_dataObject



280
281
282
283
284
285
286
287
288
289
290
# File 'lib/rhoconnect/source.rb', line 280

def flush_store_data
  delete_user_read_state
  self.class.valid_doctypes.each do |docname, doctype|
    case doctype
    when :queue
      flush_queue(docname)
    when :document
      flush_data(docname)
    end
  end
end

#get_queue(doctype) ⇒ Object



317
318
319
320
321
322
323
324
325
# File 'lib/rhoconnect/source.rb', line 317

def get_queue(doctype)
  verify_doctype(doctype)
  ret = []
  keys = []
  lock_queue_doc(doctype) do |s|
    ret, keys = Store.get_store(0).get_zdata(s.queue_docname(doctype))
  end
  [ret, keys]
end

#if_need_refresh(client_id = nil, params = nil) {|client_id, params| ... } ⇒ Object

Yields:

  • (client_id, params)


377
378
379
380
381
382
383
384
385
# File 'lib/rhoconnect/source.rb', line 377

def if_need_refresh(client_id=nil,params=nil)
  need_refresh = lock(:md) do |s|
    check = check_refresh_time
    self.read_state.prev_refresh_time = self.read_state.refresh_time if check
    self.read_state.refresh_time = Time.now.to_i + self.poll_interval if check
    check
  end
  yield client_id,params if need_refresh
end

#is_pass_through?Boolean

Returns:

  • (Boolean)


422
423
424
# File 'lib/rhoconnect/source.rb', line 422

def is_pass_through?
  self.pass_through and self.pass_through.to_s == 'true'
end

#load_read_stateObject



264
265
266
267
268
# File 'lib/rhoconnect/source.rb', line 264

def load_read_state
  id = {:app_id => self.app_id,:user_id => user_by_partition,
    :source_name => self.name}
  ReadState.load(id)
end

#lock_queue_doc(doctype) ⇒ Object

this data is not sharded



298
299
300
301
302
# File 'lib/rhoconnect/source.rb', line 298

def lock_queue_doc(doctype)
  Store.lock(queue_docname(doctype)) do
    yield self
  end
end

#partitionObject



346
347
348
# File 'lib/rhoconnect/source.rb', line 346

def partition
  self.partition_type.to_sym
end

#partition=(value) ⇒ Object



350
351
352
# File 'lib/rhoconnect/source.rb', line 350

def partition=(value)
  self.partition_type = value
end

#partition_nameObject



358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/rhoconnect/source.rb', line 358

def partition_name
  # edge case - used in deleting documents
  return self.user_id if self.user_id == '*'
  # default is user_id
  pname = self.user_id
  begin 
    model_klass = Rhoconnect::Model::Base.load_source_model(self)
    pname = model_klass ? model_klass.partition_name(self.user_id) : self.user_id
  # eat the exception here
  rescue Exception => e
  end
  pname
end

#poll_intervalObject



235
236
237
238
# File 'lib/rhoconnect/source.rb', line 235

def poll_interval
  value = Store.get_value(poll_interval_key)
  value ? value.to_i : nil
end

#poll_interval=(interval) ⇒ Object



240
241
242
# File 'lib/rhoconnect/source.rb', line 240

def poll_interval=(interval)
  Store.put_value(poll_interval_key, interval)
end

#process_queue(doctype) ⇒ Object

this is an atomic operation

  • lock queue, get queue data, flush queue, unlock queue



306
307
308
309
310
311
312
313
314
315
# File 'lib/rhoconnect/source.rb', line 306

def process_queue(doctype)
  verify_doctype(doctype)
  ret = []
  keys = []
  lock_queue_doc(doctype) do |s|
    ret, keys = Store.get_store(0).get_zdata(s.queue_docname(doctype))
    Store.get_store(0).flush_zdata(s.queue_docname(doctype))
  end
  [ret, keys]
end

#push_notify?Boolean

Returns:

  • (Boolean)


426
427
428
# File 'lib/rhoconnect/source.rb', line 426

def push_notify?
  self.push_notify and self.push_notify.to_s == 'true'
end

#push_queue(doctype, assoc_key, data = [], append = false) ⇒ Object



334
335
336
337
338
339
# File 'lib/rhoconnect/source.rb', line 334

def push_queue(doctype,assoc_key, data=[],append=false)
  verify_doctype(doctype)
  lock_queue_doc(doctype) do |s|
    Store.get_store(0).put_zdata(s.queue_docname(doctype),assoc_key, data,append)
  end
end

#queue_docname(dockey) ⇒ Object



292
293
294
295
# File 'lib/rhoconnect/source.rb', line 292

def queue_docname(dockey)
  # currently, all queues are bound by user - not shared
  "#{self.class.class_prefix(self.class)}:#{self.app_id}:#{self.name}:#{dockey.to_s}"
end

#read_stateObject



258
259
260
261
262
# File 'lib/rhoconnect/source.rb', line 258

def read_state
  id = {:app_id => self.app_id,:user_id => user_by_partition,
    :source_name => self.name}
  load_read_state || ReadState.create(id)
end

#rewind_refresh_time(query_failure) ⇒ Object



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
# File 'lib/rhoconnect/source.rb', line 387

def rewind_refresh_time(query_failure)
  return if self.poll_interval == 0
  lock(:md) do |s|
    rewind_time = false
    # reset number of retries
    # and prev_refresh_time on succesfull query
    # or if last refresh was more than 'poll_interval' time ago
    if not query_failure or ((Time.now.to_i - self.read_state.prev_refresh_time) >= self.poll_interval)
      # we need to reset the prev_refresh_time here
      # otherwise in case of expired poll interval
      # and repeating failures - it will reset the counter
      # on every error
      self.read_state.prev_refresh_time = Time.now.to_i
      self.read_state.retry_counter = 0
    end

    # rewind the refresh time on failure
    # if retry limit is not reached
    if query_failure
      if self.read_state.retry_counter < self.retry_limit
        self.read_state.increment!(:retry_counter)
        rewind_time = true
        # we have reached the limit - do not rewind the refresh time
        # and reset the counter
      else
        self.read_state.retry_counter = 0
      end
    end

    if rewind_time
      self.read_state.refresh_time = self.read_state.prev_refresh_time
    end
  end
end

#schemaObject



254
255
256
# File 'lib/rhoconnect/source.rb', line 254

def schema
  self.get_value(:schema)
end

#store_index(doctype) ⇒ Object



144
145
146
147
148
# File 'lib/rhoconnect/source.rb', line 144

def store_index(doctype)
  # app-partitioned sources go to 0
  # everything else if sharded
  compute_store_index(doctype, self, self.user_id)
end

#update(fields) ⇒ Object



230
231
232
233
# File 'lib/rhoconnect/source.rb', line 230

def update(fields)
  fields = fields.with_indifferent_access # so we can access hash keys as symbols
  self.class.set_defaults(fields)
end

#userObject

Return the user associated with a source



245
246
247
# File 'lib/rhoconnect/source.rb', line 245

def user
  User.load(self.user_id)
end

#user_by_partitionObject



354
355
356
# File 'lib/rhoconnect/source.rb', line 354

def user_by_partition
  self.partition.to_sym == :user ? partition_name : '__shared__'
end