Class: CRDT::ORSet

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/crdt/or_set.rb

Overview

Observed-Removed Set

This CRDT allows items to be added, and removed. The idea being that when an item is added, it is added along with a token. When removing an element, all tokens for that item are marked as removed. This implementation of an ORSet keeps a unified record for each item, where removed tokens are moved from an “observed” set to a “removed” set.

Efficiency: Number of items: n, Number of nodes: m, Number of operations: k Space efficiency: O(k) Space efficiency with garbage collection: O(n) Adding an item: O(1) Removing an item: O(k) in the degenerate case, typically closer to O(1) Testing if an item is in the set: O(1)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(node_identity = Thread.current.object_id, token_counter = 0) ⇒ ORSet

Create a new, empty set



16
17
18
19
20
# File 'lib/crdt/or_set.rb', line 16

def initialize(node_identity = Thread.current.object_id, token_counter = 0)
  @node_identity = node_identity
  @token_counter = token_counter
  @items = {}
end

Instance Attribute Details

#itemsObject

Returns the value of attribute items.



22
23
24
# File 'lib/crdt/or_set.rb', line 22

def items
  @items
end

#token_counterObject

Returns the value of attribute token_counter.



22
23
24
# File 'lib/crdt/or_set.rb', line 22

def token_counter
  @token_counter
end

Class Method Details

.from_h(hash) ⇒ Object

Create a ORSet from a hash, such as that deserialized from JSON



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/crdt/or_set.rb', line 69

def self.from_h(hash)
  set = ORSet.new(hash["node_identity"], hash["token_counter"])

  hash["items"].each do |item, record|
    set.items[item] = {observed: [], removed: []}
    set.items[item][:observed] += record[:observed]
    set.items[item][:removed] += record[:removed]
  end

  return set
end

Instance Method Details

#add(item) ⇒ Object

Add an item to this set



44
45
46
47
48
49
50
51
# File 'lib/crdt/or_set.rb', line 44

def add(item)
  # the token in this implementation is "better", since it's easier for us to parse/garbage collect
  token = "#{@node_identity}:#{@token_counter}"
  @token_counter += 1

  @items[item] ||= { observed: [], removed: []}
  @items[item][:observed] << token
end

#eachObject



31
32
33
34
35
36
37
38
39
40
# File 'lib/crdt/or_set.rb', line 31

def each
  if block_given?
    @items.each do |item, record|
      next if record[:observed].empty?
      yield item
    end
  else
    return to_enum
  end
end

#gc(node_to_collect, until_counter) ⇒ Object

garbage collect all tokens originating from the given node that are smaller than the given counter

This should be called only when partial consensus can be ascertained for the system



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/crdt/or_set.rb', line 96

def gc(node_to_collect, until_counter)
  match_proc = proc do |token|
    node, counter = token.split(":")
    node == node_to_collect && counter.to_i <= until_counter
  end

  @items.each do |item, record|
    # remove any removal records, since the system has reached consensus up to this node's counter
    record[:removed].reject!(&:match_proc)

    # squash all the observed tokens into one
    # This is potentially unnecessary so long as at most one active observed token is recorded per node
    tokens = record[:observed].select(&:match_proc).map do |token|
      node, counter = token.split(":")
      [node, counter.to_i]
    end.sort_by(&:last)
    surviving_token = tokens.pop
    record[:observed] -= tokens
    record[:observed] << surviving_token
  end
end

#has?(item) ⇒ Boolean

Check if this item is in the set

Returns:

  • (Boolean)


25
26
27
28
29
# File 'lib/crdt/or_set.rb', line 25

def has?(item)
  tokens = @items[item]
  return false unless tokens
  return ! tokens[:observed].empty?
end

#merge(other) ⇒ Object

Perform a one-way merge, bringing changes from the other ORSet provided

Parameters:



84
85
86
87
88
89
90
91
# File 'lib/crdt/or_set.rb', line 84

def merge(other)
  other.items.each do |item, record|
    @items[item] ||= {observed: [], removed: []}
    @items[item][:observed] |= record[:observed]
    @items[item][:removed] |= record[:removed]
    @items[item][:observed] -= @items[item][:removed]
  end
end

#remove(item) ⇒ Object

Mark an item as removed from the set



54
55
56
57
# File 'lib/crdt/or_set.rb', line 54

def remove(item)
  @items[item][:removed] += @items[item][:observed]
  @items[item][:observed] = []
end

#to_hObject

Get a hash representation of this set, suitable for serialization to JSON



60
61
62
63
64
65
66
# File 'lib/crdt/or_set.rb', line 60

def to_h
  return {
    node_identity: @node_identity,
    token_counter: @token_counter,
    items: @items,
  }
end