Class: Rhoconnect::ClientSync

Inherits:
Object
  • Object
show all
Defined in:
lib/rhoconnect/client_sync.rb

Constant Summary collapse

UNKNOWN_CLIENT =
"Unknown client"
UNKNOWN_SOURCE =
"Unknown source"
SYNC_VERSION =

TODO : Remove in Rhoconnect 4.0

3

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source, client, p_size = nil) ⇒ ClientSync

Returns a new instance of ClientSync.

Raises:

  • (ArgumentError)


11
12
13
14
15
16
17
# File 'lib/rhoconnect/client_sync.rb', line 11

def initialize(source,client,p_size=nil)
  raise ArgumentError.new(UNKNOWN_CLIENT) unless client
  raise ArgumentError.new(UNKNOWN_SOURCE) unless source
  @source,@client,@p_size = source,client,p_size ? p_size.to_i : 500
  @client.last_sync = Time.now if @client
  @source_sync = SourceSync.new(@source)
end

Instance Attribute Details

#clientObject

Returns the value of attribute client.



3
4
5
# File 'lib/rhoconnect/client_sync.rb', line 3

def client
  @client
end

#p_sizeObject

Returns the value of attribute p_size.



3
4
5
# File 'lib/rhoconnect/client_sync.rb', line 3

def p_size
  @p_size
end

#sourceObject

Returns the value of attribute source.



3
4
5
# File 'lib/rhoconnect/client_sync.rb', line 3

def source
  @source
end

#source_syncObject

Returns the value of attribute source_sync.



3
4
5
# File 'lib/rhoconnect/client_sync.rb', line 3

def source_sync
  @source_sync
end

Class Method Details

.bulk_data(partition, client, sources = nil) ⇒ Object

Raises:

  • (ArgumentError)


265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/rhoconnect/client_sync.rb', line 265

def bulk_data(partition,client,sources=nil)
  raise ArgumentError.new(UNKNOWN_CLIENT) unless client
  name = BulkData.get_name(partition,client.user_id)
  data = BulkData.load(name)

  partition_sources = client.app.partition_sources(partition,client.user_id)	
  sources ||= partition_sources
  return {:result => :nop} if sources.length <= 0

  do_bd_sync = data.nil?
  do_bd_sync = (data.completed? and 
      (data.refresh_time <= Time.now.to_i or !data.dbfiles_exist?)) unless do_bd_sync

  if do_bd_sync  
    data.delete if data
    data = BulkData.create(:name => name,
      :app_id => client.app_id,
      :user_id => client.user_id,
      :partition_sources => partition_sources,
      :sources => sources,
      :refresh_time => Time.now.to_i + Rhoconnect.bulk_sync_poll_interval)
    BulkData.enqueue("data_name" => name)
  end
  
  if data and data.completed? and data.dbfiles_exist?
    client.update_clientdoc(sources)
    sources.each do |src|
      s = Source.load(src, {:user_id => client.user_id, :app_id => client.app_id})
      errordoc = s.docname(:errors)
      errors = {}
      Store.lock(errordoc) do
        errors = Store.get_data(errordoc)
      end
      unless errors.empty?
        # FIXME: :result => :bulk_sync_error, :errors => "#{errors}"
        log "Bulk sync errors are found in #{src}: #{errors}"
        # Delete all related bulk files
        data.delete_files
        return {:result => :url, :url => ''}
      end
    end
    {:result => :url, :url => data.url}
  elsif data
    {:result => :wait}
  end
end

.reset(client, params = nil) ⇒ Object

Resets the store for a given app,client



238
239
240
241
242
243
244
245
246
247
# File 'lib/rhoconnect/client_sync.rb', line 238

def reset(client, params=nil)
  return unless client
  if params == nil or params[:sources] == nil
    client.flash_data('*')
  else
    params[:sources].each do |source|
      client.flash_source_data('*', source['name'])
    end
  end
end

.search_all(client, params = nil) ⇒ Object

Raises:

  • (ArgumentError)


249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/rhoconnect/client_sync.rb', line 249

def search_all(client,params=nil)
  raise ArgumentError.new(UNKNOWN_CLIENT) unless client
  return [] unless params[:sources]
  res = []
  params[:sources].each do |source|
    s = Source.load(source['name'],{:app_id => client.app_id,
      :user_id => client.user_id})
    client.source_name = source['name']
    cs = ClientSync.new(s,client,params[:p_size])
    params[:token] = source['token'] if source['token']
    search_res = cs.search(params)
    res << search_res if search_res
  end
  res
end

Instance Method Details

#build_page {|res| ... } ⇒ Object

Yields:

  • (res)


60
61
62
63
64
65
66
# File 'lib/rhoconnect/client_sync.rb', line 60

def build_page
  res = {}
  yield res
  res.reject! {|key,value| value.nil? or value.empty?}
  res.merge!(_send_errors)
  res
end

#compute_deleted_pageObject

Computes deleted objects (down to individual attributes) in the client document, trims it to page size, stores page, and returns page as hash



202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/rhoconnect/client_sync.rb', line 202

def compute_deleted_page
  res = {}
  delete_page_doc = @client.docname(:delete_page)
  page_size = @p_size
  diff = @source.lock(:md) { |s| Store.get_diff_data(s.docname(:md),@client.docname(:cd),@p_size) }
  diff.each do |key,value|
    res[key] = value
    Store.put_object(delete_page_doc,key,value)
    page_size -= 1
    break if page_size <= 0          
  end
  res
end

#compute_errors_pageObject

Computes errors for client and stores a copy as errors page



217
218
219
220
221
222
223
224
225
226
# File 'lib/rhoconnect/client_sync.rb', line 217

def compute_errors_page
  ['create','update','delete'].each do |operation|
    @client.lock("#{operation}_errors") do |c| 
      c.rename("#{operation}_errors","#{operation}_errors_page")
    end
  end
  @client.lock("update_rollback") do |c|
    c.rename("update_rollback","update_rollback_page")
  end
end

Computes create links for a client and stores a copy as links page



229
230
231
232
233
234
# File 'lib/rhoconnect/client_sync.rb', line 229

def compute_links_page
  @client.lock(:create_links) do |c| 
    c.rename(:create_links,:create_links_page)
    c.get_data(:create_links_page)
  end
end

#compute_metadataObject

Computes the metadata sha1 and returns metadata if client’s sha1 doesn’t match source’s sha1



156
157
158
159
160
161
162
163
164
# File 'lib/rhoconnect/client_sync.rb', line 156

def 
  , = @source.lock(:metadata) do |s|
    [s.get_value(:metadata_sha1),s.get_value(:metadata)]
  end
  return if @client.get_value(:metadata_sha1) == 
  @client.put_value(:metadata_sha1,)
  @client.put_value(:metadata_page,)
  
end

#compute_pageObject

Computes diffs between master doc and client doc, trims it to page size, stores page, and returns page as hash



169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/rhoconnect/client_sync.rb', line 169

def compute_page
  res,total_count = @source.lock(:md) do |s| 
    res = Store.get_diff_data(@client.docname(:cd),s.docname(:md),@p_size)
    total_count = s.get_value(:md_size).to_i
    [res,total_count]
  end
  # until sync is not done - set cd_size to 0
  # once there are no changes, then, set cd_size to md_size
  cd_size = res.size > 0 ? 0 : total_count
  @client.put_value(:cd_size, cd_size)
  @client.put_data(:page,res)
  @client.put_value(:total_count_page,total_count)
  [total_count,res]
end

#compute_searchObject

Computes search result, updates md for source and cd for client with the result



185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/rhoconnect/client_sync.rb', line 185

def compute_search
  client_res = Store.get_diff_data(@client.docname(:cd),@client.docname(:search),@p_size)
  @client.put_data(:cd,client_res,true)
  @client.update_count(:cd_size,client_res.size)
  @client.put_data(:search_page,client_res)
  
  @source.lock(:md) do |s|
    source_diff = Store.get_diff_data(s.docname(:md),@client.docname(:cd))
    s.put_data(:md,source_diff,true)
    s.update_count(:md_size,source_diff.size)
  end
  
  [client_res,client_res.size]
end

#receive_cud(cud_params = {}, query_params = nil) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/rhoconnect/client_sync.rb', line 19

def receive_cud(cud_params={},query_params=nil)
  if @source.is_pass_through?
    @source_sync.pass_through_cud(cud_params,query_params)
  else
    _process_blobs(cud_params)
    processed = 0
    ['create','update','delete'].each do |op|
      key,value = op,cud_params[op]
      processed += _receive_cud(key,value) if value
    end
    @source_sync.process_cud
  end
end

#resend_page(token = nil) ⇒ Object

Resend token for a client, also sends exceptions



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/rhoconnect/client_sync.rb', line 117

def resend_page(token=nil)
  token,progress_count,total_count,res = '',0,0,{}
  schema_page = @client.get_value(:schema_page)
  if schema_page
    res = {'schema-changed' => 'true'}
  else  
    res = build_page do |r|
      r['insert'] = @client.get_data(:page)
      r['delete'] = @client.get_data(:delete_page)
      r['links'] = @client.get_data(:create_links_page)
      r['metadata'] = @client.get_value(:metadata_page)
      progress_count = 0
      total_count = @client.get_value(:total_count_page).to_i
    end
  end
  token = @client.get_value(:page_token)
  [token,progress_count,total_count,res]
end

#schema_changed?Boolean

Checks if schema changed

Returns:

  • (Boolean)


137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/rhoconnect/client_sync.rb', line 137

def schema_changed?
  if @source_sync.adapter.respond_to?(:schema)
    schema_sha1 = @source.get_value(:schema_sha1)
    if @client.get_value(:schema_sha1).nil?
      @client.put_value(:schema_sha1,schema_sha1)
      return false
    elsif @client.get_value(:schema_sha1) == schema_sha1
      return false
    end
    @client.put_value(:schema_sha1,schema_sha1)
    @client.put_value(:schema_page,schema_sha1)
    return true
  else
    return false
  end
end

#search(params) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/rhoconnect/client_sync.rb', line 48

def search(params)
  if params
    return _resend_search_result if params[:token] and params[:resend]
    if params[:token] and !_ack_search(params[:token]) 
      formatted_result = _format_search_result
      _delete_search
      return formatted_result
    end
  end
  _do_search(params)
end

#send_cud(token = nil, query_params = nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/rhoconnect/client_sync.rb', line 33

def send_cud(token=nil,query_params=nil)
  res = []
  if not _ack_token(token) and not @source.is_pass_through?
    res = resend_page(token)
  else
    query_result = @source_sync.process_query(query_params)
    if @source.is_pass_through?
      res = send_pass_through_data(query_result)
    else
      res = send_new_page
    end
  end
  _format_result(res[0],res[1],res[2],res[3])
end

#send_new_pageObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rhoconnect/client_sync.rb', line 68

def send_new_page
  token,progress_count,total_count,res = '',0,0,{}
  if schema_changed?
    _expire_bulk_data
    token = compute_token(@client.docname(:page_token))
    res = {'schema-changed' => 'true'}
  else  
    compute_errors_page
    res = build_page do |r|
      total_count,r['insert'] = compute_page
      r['delete'] = compute_deleted_page
      r['links'] = compute_links_page
      r['metadata'] = 
    end
    if res['insert'] or res['delete'] or res['links']
      token = compute_token(@client.docname(:page_token))
    else
      _delete_errors_page 
    end
    @client.put_data(:cd,res['insert'],true)      
    @client.delete_data(:cd,res['delete'])
    # TODO: progress count can not be computed properly
    # without comparing what has actually changes
    # so we need to obsolete it in the future versions
    progress_count = 0
  end
  [token,progress_count,total_count,res]
end

#send_pass_through_data(data) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/rhoconnect/client_sync.rb', line 97

def send_pass_through_data(data)
  data ||= {}
  data.each_key do |object_id| 
    data[object_id].each { |attrib,value| data[object_id][attrib] = '' if value.nil? }
  end
  token = ''
  compute_errors_page
  res = build_page do |r|
    r['insert'] = data
    r['metadata'] = 
  end
  if res['insert']
    token = compute_token(@client.docname(:page_token))
  else
    _delete_errors_page 
  end    
  [token,0,data.size,res]
end