Class: Rhoconnect::Handler::Query::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/rhoconnect/handler/query/runner.rb

Direct Known Subclasses

PassThroughRunner

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(model, client, route_handler, params = {}) ⇒ Runner

Returns a new instance of Runner.

Raises:

  • (ArgumentError)


8
9
10
11
12
13
14
15
16
17
# File 'lib/rhoconnect/handler/query/runner.rb', line 8

def initialize(model,client,route_handler, params = {})
  raise ArgumentError.new(UNKNOWN_CLIENT) unless client
  raise ArgumentError.new(UNKNOWN_SOURCE) unless (model and model.source)
  raise ArgumentError.new('Invalid app for source') unless model.source.app

  @source,@client,@p_size = model.source,client,params[:p_size] ? params[:p_size].to_i : 500
  @client.last_sync = Time.now if @client
  @params = params
  @engine = Rhoconnect::Handler::Query::Engine.new(model, route_handler, @params)
end

Instance Attribute Details

#clientObject

Returns the value of attribute client.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def client
  @client
end

#engineObject

Returns the value of attribute engine.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def engine
  @engine
end

#p_sizeObject

Returns the value of attribute p_size.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def p_size
  @p_size
end

#paramsObject

Returns the value of attribute params.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def params
  @params
end

#sourceObject

Returns the value of attribute source.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def source
  @source
end

Instance Method Details

#ack_token(token) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/rhoconnect/handler/query/runner.rb', line 51

def ack_token(token)
  stored_token = @client.get_value(:page_token)
  if stored_token 
    if token and stored_token == token
      @client.put_value(:page_token,nil)
      @client.flush_data(:schema_page)
      @client.flush_data(:metadata_page)
      @client.flush_data(:create_links_page)
      @client.flush_data(:page)
      @client.flush_data(:delete_page)
      _delete_errors_page
      return true
    end
  else
    return true    
  end    
  false
end

#build_page {|res| ... } ⇒ Object

Yields:

  • (res)


82
83
84
85
86
87
88
# File 'lib/rhoconnect/handler/query/runner.rb', line 82

def build_page
  res = {}
  yield res
  res.reject! {|key,value| value.nil? or value.empty?}
  res.merge!(_send_errors)
  res
end

#compute_errors_pageObject

Computes errors for client and stores a copy as errors page



223
224
225
226
227
228
229
230
231
232
# File 'lib/rhoconnect/handler/query/runner.rb', line 223

def compute_errors_page
  ['create','update','delete'].each do |operation|
    @client.lock("#{operation}_errors") do |c| 
      c.rename("#{operation}_errors","#{operation}_errors_page")
    end
  end
  @client.lock("update_rollback") do |c|
    c.rename("update_rollback","update_rollback_page")
  end
end

Computes create links for a client and stores a copy as links page



235
236
237
238
239
240
# File 'lib/rhoconnect/handler/query/runner.rb', line 235

def compute_links_page
  @client.lock(:create_links) do |c| 
    c.rename(:create_links,:create_links_page)
    c.get_data(:create_links_page)
  end
end

#compute_metadataObject

Computes the metadata sha1 and returns metadata if client’s sha1 doesn’t match source’s sha1



162
163
164
165
166
167
168
169
170
# File 'lib/rhoconnect/handler/query/runner.rb', line 162

def 
  , = @source.lock(:metadata) do |s|
    [s.get_value(:metadata_sha1),s.get_value(:metadata)]
  end
  return if @client.get_value(:metadata_sha1) == 
  @client.put_value(:metadata_sha1,)
  @client.put_value(:metadata_page,)
  
end

#compute_pageObject

Computes diffs between master doc and client doc, trims it to page size, stores page, and returns page as hash



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/rhoconnect/handler/query/runner.rb', line 175

def compute_page
  inserts_elements_map,deletes_elements_map,total_count = @source.lock(:md) do |s| 
    inserts_elements_map = @client.get_diff_data(:cd,s.docname(:md),@p_size)
    total_count = s.get_value(:md_size).to_i
    deletes_elements_map = s.get_diff_data(:md,@client.docname(:cd),@p_size)
    [inserts_elements_map,deletes_elements_map,total_count]
  end
  # until sync is not done - set cd_size to 0
  # once there are no changes, then, set cd_size to md_size
  cd_size = inserts_elements_map.size > 0 ? 0 : total_count
  @client.put_value(:cd_size, cd_size)
  
  # now, find the exact changes
  inserts,deletes = Store.get_inserts_deletes(inserts_elements_map,deletes_elements_map)
  
  @client.put_data(:page,inserts)
  @client.put_data(:delete_page,deletes,true)
  @client.put_value(:total_count_page,total_count)
  @client.update_elements(:cd,inserts_elements_map,deletes_elements_map)
  
  [total_count,inserts,deletes]
end

#compute_page_bruteforceObject

Computes diffs between master doc and client doc, trims it to page size, stores page, and returns page as hash



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/rhoconnect/handler/query/runner.rb', line 200

def compute_page_bruteforce
  inserts_elements_map,deletes_elements_map,total_count = @source.lock(:md) do |s| 
    inserts_elements_map,deletes_elements_map = @client.get_diff_data_bruteforce(:cd,s.docname(:md),@p_size)
    total_count = s.get_value(:md_size).to_i
    [inserts_elements_map,deletes_elements_map,total_count]
  end
  # until sync is not done - set cd_size to 0
  # once there are no changes, then, set cd_size to md_size
  cd_size = inserts_elements_map.size > 0 ? 0 : total_count
  @client.put_value(:cd_size, cd_size)
  
  # now, find the exact changes
  inserts,deletes = Store.get_inserts_deletes(inserts_elements_map,deletes_elements_map)
  
  @client.put_data(:page,inserts)
  @client.put_data(:delete_page,deletes,true)
  @client.put_value(:total_count_page,total_count)
  @client.update_elements(:cd,inserts_elements_map,deletes_elements_map)
  
  [total_count,inserts,deletes]
end

#format_result(token, progress_count, total_count, res) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/rhoconnect/handler/query/runner.rb', line 70

def format_result(token,progress_count,total_count,res)
  count = 0
  count += res['insert'].length if res['insert']
  count += res['delete'].length if res['delete']
  [ {'version'=>Rhoconnect::SYNC_VERSION},
    {'token'=>(token ? token : '')},
    {'count'=>count},
    {'progress_count'=>progress_count},
    {'total_count'=>total_count},
    res ]
end

#resend_page(token = nil) ⇒ Object

Resend token for a client, also sends exceptions



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rhoconnect/handler/query/runner.rb', line 32

def resend_page(token=nil)
  token,progress_count,total_count,res = '',0,0,{}
  schema_page = @client.get_value(:schema_page)
  if schema_page
    res = {'schema-changed' => 'true'}
  else  
    res = build_page do |r|
      r['insert'] = @client.get_data(:page)
      r['delete'] = @client.get_data(:delete_page)
      r['links'] = @client.get_data(:create_links_page)
      r['metadata'] = @client.get_value(:metadata_page)
      progress_count = 0
      total_count = @client.get_value(:total_count_page).to_i
    end
  end
  token = @client.get_value(:page_token)
  [token,progress_count,total_count,res]
end

#runObject



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/rhoconnect/handler/query/runner.rb', line 19

def run
  res = []
  token = params[:token]
  if not ack_token(token)
    res = resend_page(token)
  else
    query_result = @engine.do_sync
    res = send_new_page
  end
  format_result(res[0],res[1],res[2],res[3])
end

#schema_changed?Boolean

Checks if schema changed

Returns:

  • (Boolean)


143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/rhoconnect/handler/query/runner.rb', line 143

def schema_changed?
  if engine.model.respond_to?(:schema)
    schema_sha1 = @source.get_value(:schema_sha1)
    if @client.get_value(:schema_sha1).nil?
      @client.put_value(:schema_sha1,schema_sha1)
      return false
    elsif @client.get_value(:schema_sha1) == schema_sha1
      return false
    end
    @client.put_value(:schema_sha1,schema_sha1)
    @client.put_value(:schema_page,schema_sha1)
    return true
  else
    return false
  end
end

#send_new_pageObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/rhoconnect/handler/query/runner.rb', line 90

def send_new_page
  token,progress_count,total_count,res = '',0,0,{}
  if schema_changed?
    _expire_bulk_data
    token = @client.compute_token(:page_token)
    res = {'schema-changed' => 'true'}
  else  
    compute_errors_page
    res = build_page do |r|
      total_count,r['insert'],r['delete'] = compute_page
      r['links'] = compute_links_page
      r['metadata'] = 
    end
    if res['insert'] or res['delete'] or res['links']
      token = @client.compute_token(:page_token)
    else
      _delete_errors_page 
    end
  end
  # TODO: progress count can not be computed properly
  # without comparing what has actually changes
  # so we need to obsolete it in the future versions
  progress_count = 0
  [token,progress_count,total_count,res]
end

#send_new_page_bruteforceObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/rhoconnect/handler/query/runner.rb', line 116

def send_new_page_bruteforce
  token,progress_count,total_count,res = '',0,0,{}
  if schema_changed?
    _expire_bulk_data
    token = @client.compute_token(:page_token)
    res = {'schema-changed' => 'true'}
  else  
    compute_errors_page
    res = build_page do |r|
      total_count,r['insert'],r['delete'] = compute_page_bruteforce
      r['links'] = compute_links_page
      r['metadata'] = 
    end
    if res['insert'] or res['delete'] or res['links']
      token = @client.compute_token(:page_token)
    else
      _delete_errors_page 
    end
  end
  # TODO: progress count can not be computed properly
  # without comparing what has actually changes
  # so we need to obsolete it in the future versions
  progress_count = 0
  [token,progress_count,total_count,res]
end