Class: Rollout::Zookeeper::DistributedHashtable

Inherits:
Object
  • Object
show all
Defined in:
lib/rollout/zookeeper/distributed_hashtable.rb

Instance Method Summary collapse

Constructor Details

#initialize(zk, path, options = {}) ⇒ DistributedHashtable

Returns a new instance of DistributedHashtable.



6
7
8
9
10
11
12
13
14
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 6

def initialize(zk, path, options = {})
  @zk        = zk
  @path      = path
  @mutex     = Mutex.new
  @callbacks = []
  @on_error  = options[:on_error] || proc { |ex| }

  configure_watches
end

Instance Method Details

#[](key) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 38

def [](key)
  @mutex.synchronize do
    if @hashtable
      return @hashtable[key]
    end
  end
end

#[]=(key, value) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 46

def []=(key, value)
  result = @mutex.synchronize do
    update do |hashtable|
      hashtable[key] = value
    end
  end

  fire

  return result
end

#configure_watchesObject



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 179

def configure_watches
  @register ||= @zk.register(@path) do
    read
  end

  @on_connected ||= @zk.on_connected do
    read
  end

  begin
    read
  rescue ::ZK::Exceptions::OperationTimeOut, ::Zookeeper::Exceptions::ContinuationTimeoutError, ::Zookeeper::Exceptions::NotConnected => e
    # Ignore these, we'll get them next time

    @on_error.call(e)
  end
end

#delete(key) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 66

def delete(key)
  result = @mutex.synchronize do
    update do |hashtable|
      hashtable.delete(key)
    end
  end

  fire

  return result
end

#each(&block) ⇒ Object



100
101
102
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 100

def each(&block)
  to_h.each(&block)
end

#empty?Boolean Also known as: blank?

Returns:

  • (Boolean)


114
115
116
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 114

def empty?
  length == 0
end

#fireObject



24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 24

def fire
  callbacks = @mutex.synchronize do
    @callbacks.dup
  end

  callbacks.each do |cb|
    begin
      cb.call
    rescue => e
      @on_error.call(e)
    end
  end
end

#has_key?(key) ⇒ Boolean

Returns:

  • (Boolean)


58
59
60
61
62
63
64
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 58

def has_key?(key)
  @mutex.synchronize do
    if @hashtable
      @hashtable.has_key?(key)
    end
  end
end

#lengthObject



104
105
106
107
108
109
110
111
112
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 104

def length
  @mutex.synchronize do
    if @hashtable
      @hashtable.length
    else
      0
    end
  end
end

#merge(other) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 78

def merge(other)
  result = @mutex.synchronize do
    update do |hashtable|
      hashtable.merge(other)
    end
  end

  fire

  return result
end

#on_change(&block) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 16

def on_change(&block)
  @mutex.synchronize do
    @callbacks << block
  end

  block.call
end

#readObject



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 119

def read
  @mutex.synchronize do
    begin
      current, _ = @zk.get(@path, :watch => true)
      @hashtable = Yajl::Parser.parse(current)
    rescue ::ZK::Exceptions::NoNode
      if @zk.exists?(@path, :watch => true)
        retry
      else
        @hashtable = Hash.new
      end
    end
  end

  fire
end

#to_hObject



90
91
92
93
94
95
96
97
98
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 90

def to_h
  @mutex.synchronize do
    if @hashtable
      @hashtable.dup
    else
      {}
    end
  end
end

#update(&block) ⇒ Object



136
137
138
139
140
141
142
143
144
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 136

def update(&block)
  return update_exists(&block)
rescue ::ZK::Exceptions::NoNode
  begin
    return update_initial(&block)
  rescue ::ZK::Exceptions::NodeExists
    return update_exists(&block)
  end
end

#update_exists(&block) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 146

def update_exists(&block)
  begin
    current, stat = @zk.get(@path, :watch => true)
    hashtable = Yajl::Parser.parse(current)

    result = block.call(hashtable)

    @zk.set(@path, Yajl::Encoder.encode(hashtable), :version => stat.version)
    @hashtable = hashtable

    return result
  rescue ::ZK::Exceptions::BadVersion
    sleep 0.1 + rand
    retry
  end
end

#update_initial(&block) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 163

def update_initial(&block)
  begin
    hashtable = Hash.new

    result = block.call(hashtable)

    @zk.create(@path, Yajl::Encoder.encode(hashtable))
    @hashtable = hashtable

    return result
  rescue ::ZK::Exceptions::NoNode
    @zk.mkdir_p(File.dirname(@path))
    retry
  end
end