Class: Consul::Mutex

Inherits:
Object
  • Object
show all
Defined in:
lib/consul/mutex.rb

Overview

A Consul-mediated distributed mutex

Sometimes, you just want some code to run on only one machine in a cluster at any particular time. Perhaps you only need one copy running, and you'd like to have something ready to failover, or maybe you want to make sure you don't take down all your machines simultaneously for a code upgrade.

Either way, Consul::Mutex has got you covered.

Constant Summary collapse

ThreadExceptionError =

Indicates something went wrong in the worker thread.

Catch this exception if you specifically want to know that your worker thread lost its mind and errored out. The actual exception which caused the thread to terminate is available in #nested.

Class.new(Nesty::NestedStandardError)
ConsulError =

Indicates some sort of problem communicating with Consul.

Something has gone terribly, terribly wrong, and I need to tell you all about it.

Class.new(RuntimeError)
LostLockError =

Indicates that the worker thread was terminated because we lost the distributed lock.

You can't assume anything about what state anything is in that you haven't explicitly made true using ensure. In general, if you're getting this exception, something is doing terrible, terrible things to your Consul cluster.

Class.new(RuntimeError)

Instance Method Summary collapse

Constructor Details

#initialize(key, opts = {}) ⇒ Mutex

Create a new Consul-mediated distributed mutex.

Parameters:

  • key (String)

    the path (within the Consul KV store namespace, /v1/kv) which will be used as the lock key for all operations on this mutex. Every mutex created with the same key will exclude with all other mutexes created with the same key.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :value (String) — default: hostname

    the value to set on the lock key when we acquire the lock. This can be anything you like, but generally you'll want to set something that uniquely identifies who is currently holding the lock. The default, the local system's hostname, is generally a good choice.

  • :consul_url (String) — default: 'http://localhost:8500'

    where to connect to in order to talk to your local Consul cluster.



100
101
102
103
104
# File 'lib/consul/mutex.rb', line 100

def initialize(key, opts = {})
	@key        = key
	@value      = opts.fetch(:value, Socket.gethostname)
	@consul_url = URI(opts.fetch(:consul_url, 'http://localhost:8500'))
end

Instance Method Details

#synchronize { ... } ⇒ Object

Run code under the protection of this mutex.

This method works similarly to the Mutex#synchronize method in the Ruby stdlib. You pass it a block, and only one instance of the code in the block will be running at any given moment.

The big difference is that the blocks of code being mutually excluded can be running in separate processes, on separate machines. We are truly living in the future.

The slightly smaller difference is that the block of code you specify may not actually run, or it might be killed whilst running. You'll always get an exception raised if that occurs, but you'll need to be careful to clean up any state that your code sets using ensure blocks (or equivalent).

Your block of code is also run on a separate thread within the interpreter from the one in which #synchronize itself is called (in case that's important to you).

Yields:

  • your code will be run once we have acquired the lock which controls this mutex.

Raises:

  • (ArgumentError)

    if no block is passed to this method.

  • (RuntimeError)

    if some sort of internal logic error occurs (always a bug in this code, please report)

  • (SocketError)

    if a mysterious socket-related error occurs.

  • (ConsulError)

    if an error occurs talking to Consul. This is indicative of a problem with the Consul agent, or the network.

  • (ThreadExceptionError)

    if the worker thread exits with an exception during execution. This is indicative of a problem in your code.

  • (LostLockError)

    if the Consul lock is somehow disrupted while the worker thread is running. This should only happen if the Consul cluster has a serious problem of some sort, or someone fiddles with the lock key behind our backs. Find who is fiddling with the key, and break their fingers.



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/consul/mutex.rb', line 149

def synchronize
	unless block_given?
		raise ArgumentError,
		      "No block passed to #{self.inspect}#synchronize"
	end

	acquire_lock
	
	begin
		worker_thread  = Thread.new { yield }
		watcher_thread = Thread.new { hold_lock }

		tw = ThreadsWait.new(worker_thread, watcher_thread)
		finished_thread = tw.next_wait

		err = nil

		case finished_thread
			when worker_thread
				# Work completed successfully... excellent
				return worker_thread.value

			when watcher_thread
				# We lost the lock... fiddlesticks
				
				# May as well delete our session now, it's useless
				delete_session

				k = watcher_thread.value
				
				msg = if k.nil?
					"Lost lock, key deleted!"
				else
					if k.session.nil?
						"Lost lock, no active session!"
					else
						"Lost lock to session '#{k.session}'"
					end
				end

				raise LostLockError, msg
			else
				raise RuntimeError,
				      "Mysterious return value from `ThreadsWait#next_wait: #{finished_thread.inspect}"
		end
	ensure
		watcher_thread.kill
		worker_thread.kill

		begin
			worker_thread.join
		rescue Exception => ex
			worker_thread = ex
		end

		begin
			watcher_thread.join
		rescue Exception => ex
			watcher_thread = ex
		end

		release_lock if @session_id

		if worker_thread.is_a?(Exception)
			raise ThreadExceptionError.new(
			        "Worker thread raised exception",
			        worker_thread
			      )
		end
		
		if watcher_thread.is_a?(Exception)
			if watcher_thread.is_a?(ConsulError)
				raise watcher_thread
			else
				raise RuntimeError,
				      "Watcher thread raised exception: " +
				      "#{watcher_thread.message} " +
				      "(#{watcher_thread.class})"
			end
		end
	end
end