Class: Rollout::Zookeeper::DistributedHashtable
- Inherits:
-
Object
- Object
- Rollout::Zookeeper::DistributedHashtable
- Defined in:
- lib/rollout/zookeeper/distributed_hashtable.rb
Instance Method Summary collapse
- #[](key) ⇒ Object
- #[]=(key, value) ⇒ Object
- #configure_watches ⇒ Object
- #delete(key) ⇒ Object
- #each(&block) ⇒ Object
- #empty? ⇒ Boolean (also: #blank?)
- #fire ⇒ Object
- #has_key?(key) ⇒ Boolean
-
#initialize(zk, path, options = {}) ⇒ DistributedHashtable
constructor
A new instance of DistributedHashtable.
- #length ⇒ Object
- #merge(other) ⇒ Object
- #on_change(&block) ⇒ Object
- #read ⇒ Object
- #to_h ⇒ Object
- #update(&block) ⇒ Object
- #update_exists(&block) ⇒ Object
- #update_initial(&block) ⇒ Object
Constructor Details
#initialize(zk, path, options = {}) ⇒ DistributedHashtable
Returns a new instance of DistributedHashtable.
6 7 8 9 10 11 12 13 14 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 6 def initialize(zk, path, = {}) @zk = zk @path = path @mutex = Mutex.new @callbacks = [] @on_error = [:on_error] || proc { |ex| } configure_watches end |
Instance Method Details
#[](key) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 38 def [](key) @mutex.synchronize do if @hashtable return @hashtable[key] end end end |
#[]=(key, value) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 46 def []=(key, value) result = @mutex.synchronize do update do |hashtable| hashtable[key] = value end end fire return result end |
#configure_watches ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 179 def configure_watches @register ||= @zk.register(@path) do read end @on_connected ||= @zk.on_connected do read end begin read rescue ::ZK::Exceptions::OperationTimeOut, ::Zookeeper::Exceptions::ContinuationTimeoutError, ::Zookeeper::Exceptions::NotConnected => e # Ignore these, we'll get them next time @on_error.call(e) end end |
#delete(key) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 66 def delete(key) result = @mutex.synchronize do update do |hashtable| hashtable.delete(key) end end fire return result end |
#each(&block) ⇒ Object
100 101 102 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 100 def each(&block) to_h.each(&block) end |
#empty? ⇒ Boolean Also known as: blank?
114 115 116 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 114 def empty? length == 0 end |
#fire ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 24 def fire callbacks = @mutex.synchronize do @callbacks.dup end callbacks.each do |cb| begin cb.call rescue => e @on_error.call(e) end end end |
#has_key?(key) ⇒ Boolean
58 59 60 61 62 63 64 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 58 def has_key?(key) @mutex.synchronize do if @hashtable @hashtable.has_key?(key) end end end |
#length ⇒ Object
104 105 106 107 108 109 110 111 112 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 104 def length @mutex.synchronize do if @hashtable @hashtable.length else 0 end end end |
#merge(other) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 78 def merge(other) result = @mutex.synchronize do update do |hashtable| hashtable.merge(other) end end fire return result end |
#on_change(&block) ⇒ Object
16 17 18 19 20 21 22 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 16 def on_change(&block) @mutex.synchronize do @callbacks << block end block.call end |
#read ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 119 def read @mutex.synchronize do begin current, _ = @zk.get(@path, :watch => true) @hashtable = Yajl::Parser.parse(current) rescue ::ZK::Exceptions::NoNode if @zk.exists?(@path, :watch => true) retry else @hashtable = Hash.new end end end fire end |
#to_h ⇒ Object
90 91 92 93 94 95 96 97 98 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 90 def to_h @mutex.synchronize do if @hashtable @hashtable.dup else {} end end end |
#update(&block) ⇒ Object
136 137 138 139 140 141 142 143 144 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 136 def update(&block) return update_exists(&block) rescue ::ZK::Exceptions::NoNode begin return update_initial(&block) rescue ::ZK::Exceptions::NodeExists return update_exists(&block) end end |
#update_exists(&block) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 146 def update_exists(&block) begin current, stat = @zk.get(@path, :watch => true) hashtable = Yajl::Parser.parse(current) result = block.call(hashtable) @zk.set(@path, Yajl::Encoder.encode(hashtable), :version => stat.version) @hashtable = hashtable return result rescue ::ZK::Exceptions::BadVersion sleep 0.1 + rand retry end end |
#update_initial(&block) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/rollout/zookeeper/distributed_hashtable.rb', line 163 def update_initial(&block) begin hashtable = Hash.new result = block.call(hashtable) @zk.create(@path, Yajl::Encoder.encode(hashtable)) @hashtable = hashtable return result rescue ::ZK::Exceptions::NoNode @zk.mkdir_p(File.dirname(@path)) retry end end |