Class: Consul::Mutex

Inherits:
Object
  • Object
show all
Defined in:
lib/consul/mutex.rb

Overview

A Consul-mediated distributed mutex

Sometimes, you just want some code to run on only one machine in a cluster at any particular time. Perhaps you only need one copy running, and you'd like to have something ready to failover, or maybe you want to make sure you don't take down all your machines simultaneously for a code upgrade.

Either way, Consul::Mutex has got you covered.

Constant Summary collapse

ThreadExceptionError =

Indicates something went wrong in the worker thread.

Catch this exception if you specifically want to know that your worker thread lost its mind and errored out. The actual exception which caused the thread to terminate is available in #nested.

Class.new(Nesty::NestedStandardError)
ConsulError =

Indicates some sort of problem communicating with Consul.

Something has gone terribly, terribly wrong, and I need to tell you all about it.

Class.new(RuntimeError)
LostLockError =

Indicates that the worker thread was terminated because we lost the distributed lock.

You can't assume anything about what state anything is in that you haven't explicitly made true using ensure. In general, if you're getting this exception, something is doing terrible, terrible things to your Consul cluster.

Class.new(RuntimeError)

Instance Method Summary collapse

Constructor Details

#initialize(key, opts = {}) ⇒ Mutex

Create a new Consul-mediated distributed mutex.

Parameters:

  • key (String)

    the path (within the Consul KV store namespace, /v1/kv) which will be used as the lock key for all operations on this mutex. Every mutex created with the same key will exclude with all other mutexes created with the same key.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :value (String) — default: hostname

    the value to set on the lock key when we acquire the lock. This can be anything you like, but generally you'll want to set something that uniquely identifies who is currently holding the lock. The default, the local system's hostname, is generally a good choice.

  • :consul_url (String) — default: 'http://localhost:8500'

    where to connect to in order to talk to your local Consul cluster.



100
101
102
103
104
# File 'lib/consul/mutex.rb', line 100

def initialize(key, opts = {})
  @key        = key
  @value      = opts.fetch(:value, Socket.gethostname)
  @consul_url = URI(opts.fetch(:consul_url, 'http://localhost:8500'))
end

Instance Method Details

#synchronize { ... } ⇒ Object

Run code under the protection of this mutex.

This method works similarly to the Mutex#synchronize method in the Ruby stdlib. You pass it a block, and only one instance of the code in the block will be running at any given moment.

The big difference is that the blocks of code being mutually excluded can be running in separate processes, on separate machines. We are truly living in the future.

The slightly smaller difference is that the block of code you specify may not actually run, or it might be killed whilst running. You'll always get an exception raised if that occurs, but you'll need to be careful to clean up any state that your code sets using ensure blocks (or equivalent).

Your block of code is also run on a separate thread within the interpreter from the one in which #synchronize itself is called (in case that's important to you).

Yields:

  • your code will be run once we have acquired the lock which controls this mutex.

Raises:

  • (ArgumentError)

    if no block is passed to this method.

  • (RuntimeError)

    if some sort of internal logic error occurs (always a bug in this code, please report)

  • (SocketError)

    if a mysterious socket-related error occurs.

  • (ConsulError)

    if an error occurs talking to Consul. This is indicative of a problem with the Consul agent, or the network.

  • (ThreadExceptionError)

    if the worker thread exits with an exception during execution. This is indicative of a problem in your code.

  • (LostLockError)

    if the Consul lock is somehow disrupted while the worker thread is running. This should only happen if the Consul cluster has a serious problem of some sort, or someone fiddles with the lock key behind our backs. Find who is fiddling with the key, and break their fingers.



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/consul/mutex.rb', line 149

def synchronize
  unless block_given?
    raise ArgumentError,
          "No block passed to #{self.inspect}#synchronize"
  end

  acquire_lock
  
  begin
    worker_thread  = Thread.new { yield }
    watcher_thread = Thread.new { hold_lock }

    tw = ThreadsWait.new(worker_thread, watcher_thread)
    finished_thread = tw.next_wait

    err = nil

    case finished_thread
      when worker_thread
        # Work completed successfully... excellent
        return worker_thread.value

      when watcher_thread
        # We lost the lock... fiddlesticks
        
        # May as well delete our session now, it's useless
        delete_session

        k = watcher_thread.value
        
        msg = if k.nil?
          "Lost lock, key deleted!"
        else
          if k.session.nil?
            "Lost lock, no active session!"
          else
            "Lost lock to session '#{k.session}'"
          end
        end

        raise LostLockError, msg
      else
        raise RuntimeError,
              "Mysterious return value from `ThreadsWait#next_wait: #{finished_thread.inspect}"
    end
  ensure
    watcher_thread.kill
    worker_thread.kill

    begin
      worker_thread.join
    rescue Exception => ex
      worker_thread = ex
    end

    begin
      watcher_thread.join
    rescue Exception => ex
      watcher_thread = ex
    end

    release_lock if @session_id

    if worker_thread.is_a?(Exception)
      raise ThreadExceptionError.new(
              "Worker thread raised exception",
              worker_thread
            )
    end
    
    if watcher_thread.is_a?(Exception)
      if watcher_thread.is_a?(ConsulError)
        raise watcher_thread
      else
        raise RuntimeError,
              "Watcher thread raised exception: " +
              "#{watcher_thread.message} " +
              "(#{watcher_thread.class})"
      end
    end
  end
end