Class: CRDT::ORSet
Overview
Observed-Removed Set
This CRDT allows items to be added, and removed. The idea being that when an item is added, it is added along with a token. When removing an element, all tokens for that item are marked as removed. This implementation of an ORSet keeps a unified record for each item, where removed tokens are moved from an “observed” set to a “removed” set.
Efficiency: Number of items: n, Number of nodes: m, Number of operations: k Space efficiency: O(k) Space efficiency with garbage collection: O(n) Adding an item: O(1) Removing an item: O(k) in the degenerate case, typically closer to O(1) Testing if an item is in the set: O(1)
Instance Attribute Summary collapse
-
#items ⇒ Object
Returns the value of attribute items.
-
#token_counter ⇒ Object
Returns the value of attribute token_counter.
Class Method Summary collapse
-
.from_h(hash) ⇒ Object
Create a ORSet from a hash, such as that deserialized from JSON.
Instance Method Summary collapse
-
#add(item) ⇒ Object
Add an item to this set.
- #each ⇒ Object
-
#gc(node_to_collect, until_counter) ⇒ Object
garbage collect all tokens originating from the given node that are smaller than the given counter.
-
#has?(item) ⇒ Boolean
Check if this item is in the set.
-
#initialize(node_identity = Thread.current.object_id, token_counter = 0) ⇒ ORSet
constructor
Create a new, empty set.
-
#merge(other) ⇒ Object
Perform a one-way merge, bringing changes from the other ORSet provided.
-
#remove(item) ⇒ Object
Mark an item as removed from the set.
-
#to_h ⇒ Object
Get a hash representation of this set, suitable for serialization to JSON.
Constructor Details
#initialize(node_identity = Thread.current.object_id, token_counter = 0) ⇒ ORSet
Create a new, empty set
16 17 18 19 20 |
# File 'lib/crdt/or_set.rb', line 16 def initialize(node_identity = Thread.current.object_id, token_counter = 0) @node_identity = node_identity @token_counter = token_counter @items = {} end |
Instance Attribute Details
#items ⇒ Object
Returns the value of attribute items.
22 23 24 |
# File 'lib/crdt/or_set.rb', line 22 def items @items end |
#token_counter ⇒ Object
Returns the value of attribute token_counter.
22 23 24 |
# File 'lib/crdt/or_set.rb', line 22 def token_counter @token_counter end |
Class Method Details
.from_h(hash) ⇒ Object
Create a ORSet from a hash, such as that deserialized from JSON
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/crdt/or_set.rb', line 69 def self.from_h(hash) set = ORSet.new(hash["node_identity"], hash["token_counter"]) hash["items"].each do |item, record| set.items[item] = {observed: [], removed: []} set.items[item][:observed] += record[:observed] set.items[item][:removed] += record[:removed] end return set end |
Instance Method Details
#add(item) ⇒ Object
Add an item to this set
44 45 46 47 48 49 50 51 |
# File 'lib/crdt/or_set.rb', line 44 def add(item) # the token in this implementation is "better", since it's easier for us to parse/garbage collect token = "#{@node_identity}:#{@token_counter}" @token_counter += 1 @items[item] ||= { observed: [], removed: []} @items[item][:observed] << token end |
#each ⇒ Object
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/crdt/or_set.rb', line 31 def each if block_given? @items.each do |item, record| next if record[:observed].empty? yield item end else return to_enum end end |
#gc(node_to_collect, until_counter) ⇒ Object
garbage collect all tokens originating from the given node that are smaller than the given counter
This should be called only when partial consensus can be ascertained for the system
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/crdt/or_set.rb', line 96 def gc(node_to_collect, until_counter) match_proc = proc do |token| node, counter = token.split(":") node == node_to_collect && counter.to_i <= until_counter end @items.each do |item, record| # remove any removal records, since the system has reached consensus up to this node's counter record[:removed].reject!(&:match_proc) # squash all the observed tokens into one # This is potentially unnecessary so long as at most one active observed token is recorded per node tokens = record[:observed].select(&:match_proc).map do |token| node, counter = token.split(":") [node, counter.to_i] end.sort_by(&:last) surviving_token = tokens.pop record[:observed] -= tokens record[:observed] << surviving_token end end |
#has?(item) ⇒ Boolean
Check if this item is in the set
25 26 27 28 29 |
# File 'lib/crdt/or_set.rb', line 25 def has?(item) tokens = @items[item] return false unless tokens return ! tokens[:observed].empty? end |
#merge(other) ⇒ Object
Perform a one-way merge, bringing changes from the other ORSet provided
84 85 86 87 88 89 90 91 |
# File 'lib/crdt/or_set.rb', line 84 def merge(other) other.items.each do |item, record| @items[item] ||= {observed: [], removed: []} @items[item][:observed] |= record[:observed] @items[item][:removed] |= record[:removed] @items[item][:observed] -= @items[item][:removed] end end |
#remove(item) ⇒ Object
Mark an item as removed from the set
54 55 56 57 |
# File 'lib/crdt/or_set.rb', line 54 def remove(item) @items[item][:removed] += @items[item][:observed] @items[item][:observed] = [] end |
#to_h ⇒ Object
Get a hash representation of this set, suitable for serialization to JSON
60 61 62 63 64 65 66 |
# File 'lib/crdt/or_set.rb', line 60 def to_h return { node_identity: @node_identity, token_counter: @token_counter, items: @items, } end |