Class: Archipelago::Sanitation::Officer

Inherits:
Client::Base show all
Defined in:
lib/archipelago/sanitation.rb

Overview

The client class for the redundant Archipelago::Dump network.

Keeps track of our sites and writes and reads data.

Also keeps track of all the redundancy work needed, but lets Site do the work.

Instance Attribute Summary

Attributes inherited from Client::Base

#debug_callable, #jockey, #service_descriptions, #services

Attributes included from Disco::Camel

#jockey

Instance Method Summary collapse

Methods inherited from Client::Base

#around_update_services, #method_missing, #setup_client, #stop!, #update_services!

Constructor Details

#initialize(options = {}) ⇒ Officer

Returns a new instance of Officer.



86
87
88
# File 'lib/archipelago/sanitation.rb', line 86

def initialize(options = {})
  setup(options)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Archipelago::Client::Base

Instance Method Details

#[](key) ⇒ Object

Get the data for key in the site network.

The key must be a SHA1 hash.



141
142
143
# File 'lib/archipelago/sanitation.rb', line 141

def [](key)
  fetch(key).first
end

#[]=(key, value, t = [Time.now.to_i].pack("I")) ⇒ Object

Write key and value into the site network with a good level of redundancy etc.

The key should must be a SHA1 hash.

Optionally the timestamp t can be provided, but it defaults to now.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/archipelago/sanitation.rb', line 111

def []=(key, value, t = [Time.now.to_i].pack("I"))
  super_string = Oneliner::SuperString.new(value)
  nr_of_needed_chunks = @minimum_nr_of_chunks / @minimum_redundancy_ratio
  chunk_size = (super_string.size / nr_of_needed_chunks) + @metadata_overhead
  chunk_size = @minimum_recoverable_size / nr_of_needed_chunks if chunk_size < @minimum_recoverable_size / nr_of_needed_chunks

  dump_hash = responsible_sites(key)
  super_string.encode(8)
  dump_hash.t_each do |dump_id, nr_of_chunks_needed|
    @debug_callable.call("calling #{dump_id}.insert!(#{key}, ..., #{t})") if @debug_callable
    self.sites[dump_id][:service].insert!(key, 
                                          (0...nr_of_chunks_needed).collect do |nr_of_chunks_needed|
                                            super_string.encode(chunk_size)
                                          end,
                                          t)
  end
end

#belongs_at?(service_id, key) ⇒ Boolean

Returns whether the key belongs at the service with given id.

Returns:

  • (Boolean)


167
168
169
# File 'lib/archipelago/sanitation.rb', line 167

def belongs_at?(service_id, key)
  responsible_sites(key).include?(service_id)
end

#delete!(key) ⇒ Object



129
130
131
132
133
134
# File 'lib/archipelago/sanitation.rb', line 129

def delete!(key)
  dump_hash = responsible_sites(key)
  dump_hash.t_each do |dump_id, nr_of_chunks_available|
    self.sites[dump_id][:service].delete!(key)
  end
end

#next_to?(service_id1, service_id2) ⇒ Boolean

Returns whether service_id1 and service_id2 would come in that order in the site array if both existed.

Returns:

  • (Boolean)


189
190
191
192
193
194
195
196
197
# File 'lib/archipelago/sanitation.rb', line 189

def next_to?(service_id1, service_id2)
  if self.sites.include?(service_id1)
    return get_least_greater_than(:sites, service_id1, 1).first[:service_id] <= service_id2
  elsif self.sites.include?(service_id2)
    return get_greatest_less_than(:sites, service_id2, 1).first[:service_id] >= service_id1
  else 
    return false
  end
end

#predecessor(service_id) ⇒ Object

Gets the predecessor of service_id in the array of services.



210
211
212
# File 'lib/archipelago/sanitation.rb', line 210

def predecessor(service_id)
  return get_greatest_less_than(:sites, service_id, 1).first[:service_id]
end

#redistribute(key) ⇒ Object

Ensures that all the dumps responsible for key has chunks for that key without changing the timestamp for key.



176
177
178
179
180
181
182
183
# File 'lib/archipelago/sanitation.rb', line 176

def redistribute(key)
  value, timestamp = fetch(key)
  #
  # Even if fetch didnt raise the exception we must, cause this is serious business.
  #
  raise NotEnoughDataException.new(self, key) if value.nil?
  self.[]=(key, value, timestamp)
end

#responsible_sites(key) ⇒ Object

Returns => nr_of_chunks_it_should_have where sum(nr_of_chunks_it_should_have) == n from self.sites having service_id > key.

Will loop to the beginning if the number of elements run out.



152
153
154
155
156
157
158
159
160
161
162
# File 'lib/archipelago/sanitation.rb', line 152

def responsible_sites(key)
  raise NoRemoteDatabaseAvailableException.new(self) if self.sites.empty?

  rval = {}
  rval.extend(Archipelago::Current::ThreadedCollection)
  get_least_greater_than(:sites, key, @minimum_nr_of_chunks).each do |desc|
    rval[desc[:service_id]] ||= 0
    rval[desc[:service_id]] += 1
  end
  return rval
end

#second_master_to(service_id) ⇒ Object

Returns the site after the first one that has keys that will be stored in the site identified by service_id.



203
204
205
# File 'lib/archipelago/sanitation.rb', line 203

def second_master_to(service_id)
  return get_greatest_less_than(:sites, service_id, @minimum_nr_of_chunks - 1).first[:service_id]
end

#setup(options = {}) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/archipelago/sanitation.rb', line 90

def setup(options = {})
  @minimum_recoverable_size = options[:minimum_recoverable_size] || MINIMUM_RECOVERABLE_SIZE
  @minimum_nr_of_chunks = options[:minimum_nr_of_chunks] || MINIMUM_NR_OF_CHUNKS
  @minimum_redundancy_ratio = options[:minimum_redundancy_ratio] || MINIMUM_REDUNDANCY_RATIO
  @metadata_overhead = options[:metadata_overhead] || METADATA_OVERHEAD

  options.merge!({
                   :service_descriptions => {
                     :sites => SITE_DESCRIPTION.merge(options[:site_description] || {})
                   }
                 })
  setup_client(options)
end