Class: Consul::Mutex
- Inherits:
-
Object
- Object
- Consul::Mutex
- Defined in:
- lib/consul/mutex.rb
Overview
A Consul-mediated distributed mutex
Sometimes, you just want some code to run on only one machine in a cluster at any particular time. Perhaps you only need one copy running, and you'd like to have something ready to failover, or maybe you want to make sure you don't take down all your machines simultaneously for a code upgrade.
Either way, Consul::Mutex
has got you covered.
Constant Summary collapse
- ThreadExceptionError =
Indicates something went wrong in the worker thread.
Catch this exception if you specifically want to know that your worker thread lost its mind and errored out. The actual exception which caused the thread to terminate is available in
#nested
. Class.new(Nesty::NestedStandardError)
- ConsulError =
Indicates some sort of problem communicating with Consul.
Something has gone terribly, terribly wrong, and I need to tell you all about it.
Class.new(RuntimeError)
- LostLockError =
Indicates that the worker thread was terminated because we lost the distributed lock.
You can't assume anything about what state anything is in that you haven't explicitly made true using
ensure
. In general, if you're getting this exception, something is doing terrible, terrible things to your Consul cluster. Class.new(RuntimeError)
Instance Method Summary collapse
-
#initialize(key, opts = {}) ⇒ Mutex
constructor
Create a new Consul-mediated distributed mutex.
-
#synchronize { ... } ⇒ Object
Run code under the protection of this mutex.
Constructor Details
#initialize(key, opts = {}) ⇒ Mutex
Create a new Consul-mediated distributed mutex.
100 101 102 103 104 |
# File 'lib/consul/mutex.rb', line 100 def initialize(key, opts = {}) @key = key @value = opts.fetch(:value, Socket.gethostname) @consul_url = URI(opts.fetch(:consul_url, 'http://localhost:8500')) end |
Instance Method Details
#synchronize { ... } ⇒ Object
Run code under the protection of this mutex.
This method works similarly to the Mutex#synchronize
method in the
Ruby stdlib. You pass it a block, and only one instance of the code
in the block will be running at any given moment.
The big difference is that the blocks of code being mutually excluded can be running in separate processes, on separate machines. We are truly living in the future.
The slightly smaller difference is that the block of code you specify
may not actually run, or it might be killed whilst running. You'll
always get an exception raised if that occurs, but you'll need to
be careful to clean up any state that your code sets using ensure
blocks (or equivalent).
Your block of code is also run on a separate thread within the
interpreter from the one in which #synchronize
itself is called (in
case that's important to you).
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/consul/mutex.rb', line 149 def synchronize unless block_given? raise ArgumentError, "No block passed to #{self.inspect}#synchronize" end acquire_lock begin worker_thread = Thread.new { yield } watcher_thread = Thread.new { hold_lock } tw = ThreadsWait.new(worker_thread, watcher_thread) finished_thread = tw.next_wait err = nil case finished_thread when worker_thread # Work completed successfully... excellent return worker_thread.value when watcher_thread # We lost the lock... fiddlesticks # May as well delete our session now, it's useless delete_session k = watcher_thread.value msg = if k.nil? "Lost lock, key deleted!" else if k.session.nil? "Lost lock, no active session!" else "Lost lock to session '#{k.session}'" end end raise LostLockError, msg else raise RuntimeError, "Mysterious return value from `ThreadsWait#next_wait: #{finished_thread.inspect}" end ensure watcher_thread.kill worker_thread.kill begin worker_thread.join rescue Exception => ex worker_thread = ex end begin watcher_thread.join rescue Exception => ex watcher_thread = ex end release_lock if @session_id if worker_thread.is_a?(Exception) raise ThreadExceptionError.new( "Worker thread raised exception", worker_thread ) end if watcher_thread.is_a?(Exception) if watcher_thread.is_a?(ConsulError) raise watcher_thread else raise RuntimeError, "Watcher thread raised exception: " + "#{watcher_thread.} " + "(#{watcher_thread.class})" end end end end |