Class: DistributedLock::GoogleCloudStorage::Lock

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/distributed-lock-google-cloud-storage/lock.rb

Constant Summary collapse

DEFAULT_INSTANCE_IDENTITY_PREFIX_WITHOUT_PID =
SecureRandom.hex(12).freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix, thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new, ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS, backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX, backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil) ⇒ Lock

Note:

The logger must either be thread-safe, or all writes to this logger by anything besides this ‘Lock` instance must be synchronized through `logger_mutex`. This is because the logger will be written to by a background thread.

Creates a new Lock instance.

Under the hood we’ll instantiate a [Google::Cloud::Storage::Bucket](googleapis.dev/ruby/google-cloud-storage/latest/Google/Cloud/Storage/Bucket.html) object for accessing the bucket. You can customize the project ID, authentication method, etc. through ‘cloud_storage_options` and `cloud_storage_bucket_options`.

Parameters:

  • bucket_name (String)

    The name of a Cloud Storage bucket in which to place the lock. This bucket must already exist.

  • path (String)

    The object path within the bucket to use for locking.

  • instance_identity_prefix (String) (defaults to: self.class.default_instance_identity_prefix)

    A unique identifier for the client of this lock, excluding its thread identity. Learn more in the readme, section “Instant recovery from stale locks”.

  • thread_safe (Boolean) (defaults to: true)

    Whether this Lock instance should be thread-safe. When true, the thread’s identity will be included in the instance identity.

  • logger (defaults to: Logger.new($stderr))

    A Logger-compatible object to log progress to. See also the note about thread-safety.

  • logger_mutex (defaults to: Mutex.new)

    A Mutex to synchronize multithreaded writes to the logger.

  • ttl (Numeric) (defaults to: DEFAULT_TTL)

    The lock is considered stale if it’s age (in seconds) is older than this value. This value should be generous, in the order of minutes.

  • refresh_interval (Numeric, nil) (defaults to: nil)

    We’ll refresh the lock’s timestamp every ‘refresh_interval` seconds. This value should be many times smaller than `ttl`, in order to account for network delays, temporary network errors, and events that cause the lock to become unhealthy.

    This value must be smaller than ‘ttl / max_refresh_fails`.

    Default: ‘ttl / (max_refresh_fails * 3)`

  • max_refresh_fails (Integer) (defaults to: DEFAULT_MAX_REFRESH_FAILS)

    The lock will be declared unhealthy if refreshing fails with a temporary error this many times consecutively. If refreshing fails with a permanent error, then the lock is immediately declared unhealthy regardless of this value.

  • backoff_min (Numeric) (defaults to: DEFAULT_BACKOFF_MIN)

    Minimum amount of time, in seconds, to back off when waiting for a lock to become available. Must be at least 0.

  • backoff_max (Numeric) (defaults to: DEFAULT_BACKOFF_MAX)

    Maximum amount of time, in seconds, to back off when waiting for a lock to become available. Must be at least ‘backoff_min`.

  • backoff_multiplier (Numeric) (defaults to: DEFAULT_BACKOFF_MULTIPLIER)

    Factor to increase the backoff time by, each time when acquiring the lock fails. Must be at least 0.

  • object_acl (String, nil) (defaults to: nil)

    A predefined set of access control to apply to the Cloud Storage object. See the ‘acl` parameter in [googleapis.dev/ruby/google-cloud-storage/latest/Google/Cloud/Storage/Bucket.html#create_file-instance_method](Google::Cloud::Storage::Bucket#create_file) for acceptable values.

  • cloud_storage_options (Hash, nil) (defaults to: nil)

    Additional options to pass to Google::Cloud::Storage.new. See its documentation to learn which options are available.

  • cloud_storage_bucket_options (Hash, nil) (defaults to: nil)

    Additional options to pass to Google::Cloud::Storage::Project#bucket. See its documentation to learn which options are available.

Raises:

  • (ArgumentError)

    When an invalid argument is detected.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 81

def initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix,
  thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new,
  ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS,
  backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX,
  backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER,
  object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil)

  check_refresh_interval_allowed!(ttl, refresh_interval, max_refresh_fails)
  check_backoff_min!(backoff_min)
  check_backoff_max!(backoff_max, backoff_min)
  check_backoff_multiplier!(backoff_multiplier)


  ### Read-only variables (safe to access concurrently) ###

  @bucket_name = bucket_name
  @path = path
  @instance_identity_prefix = instance_identity_prefix
  @thread_safe = thread_safe
  @logger = logger
  @logger_mutex = logger_mutex
  @ttl = ttl
  @refresh_interval = refresh_interval || ttl.to_f / (max_refresh_fails * 3)
  @max_refresh_fails = max_refresh_fails
  @backoff_min = backoff_min
  @backoff_max = backoff_max
  @backoff_multiplier = backoff_multiplier
  @object_acl = object_acl

  @client = create_gcloud_storage_client(cloud_storage_options)
  @bucket = get_gcloud_storage_bucket(@client, bucket_name, cloud_storage_bucket_options)

  @state_mutex = Mutex.new
  @refresher_cond = ConditionVariable.new


  ### Read-write variables protected by @state_mutex ###

  @owner = nil
  @metageneration = nil
  @refresher_thread = nil

  # The refresher generation is incremented every time we shutdown
  # the refresher thread. It allows the refresher thread to know
  # whether it's being shut down (and thus shouldn't access/modify
  # state).
  @refresher_generation = 0
end

Class Method Details

.default_instance_identity_prefixString

Generates a sane default instance identity prefix string. The result is identical across multiple calls in the same process. It supports forking, so that calling this method in a forked child process automatically returns a different value than when called from the parent process.

The result doesn’t include a thread identitier, which is why we call this a prefix.

Returns:

  • (String)


26
27
28
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 26

def self.default_instance_identity_prefix
  "#{DEFAULT_INSTANCE_IDENTITY_PREFIX_WITHOUT_PID}-#{Process.pid}"
end

Instance Method Details

#abandonvoid

This method returns an undefined value.

Pretends like we’ve never obtained this lock, abandoning our internal state about the lock.

Shuts down background lock refreshing, and ensures that #locked_according_to_internal_state? returns false.

Does not modify any server data, so #locked_according_to_server? may still return true.



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 282

def abandon
  refresher_generation = nil
  thread = nil

  @state_mutex.synchronize do
    if unsynced_locked_according_to_internal_state?
      refresher_generation = @refresher_generation
      thread = shutdown_refresher_thread
    end
  end

  if thread
    log_debug { "Abandoning locked lock" }
    thread.join
    log_debug { "Done abandoned locked lock. refresher_generation=#{refresher_generation}" }
  else
    log_debug { "Abandoning unlocked lock" }
  end
end

#check_health!void

This method returns an undefined value.

Checks whether the lock is healthy. See #healthy? for the definition of “healthy”.

It only makes sense to call this method after having obtained this lock.

Raises:



328
329
330
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 328

def check_health!
  raise LockUnhealthyError, 'Lock is not healthy' if !healthy?
end

#healthy?Boolean

Returns whether the lock is healthy. A lock is considered healthy until we fail to refresh the lock too many times consecutively.

Failure to refresh could happen for many reasons, including but not limited to: network problems, the lock object being forcefully deleted by someone else.

“Too many” is defined by the ‘max_refresh_fails` argument passed to the constructor.

It only makes sense to call this method after having obtained this lock.

Returns:

  • (Boolean)

Raises:



314
315
316
317
318
319
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 314

def healthy?
  @state_mutex.synchronize do
    raise NotLockedError, 'Not locked' if !unsynced_locked_according_to_internal_state?
    @refresher_thread.alive?
  end
end

#lock(timeout: 2 * @ttl) ⇒ void

This method returns an undefined value.

Obtains the lock. If the lock is stale, resets it automatically. If the lock is already obtained by some other instance identity, waits until it becomes available, or until timeout.

Parameters:

  • timeout (Numeric) (defaults to: 2 * @ttl)

    The timeout in seconds.

Raises:

  • (AlreadyLockedError)

    This Lock instance — according to its internal state — believes that it’s already holding the lock.

  • (TimeoutError)

    Failed to acquire the lock within ‘timeout` seconds.

  • (Google::Cloud::Error)


180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 180

def lock(timeout: 2 * @ttl)
  raise AlreadyLockedError, 'Already locked' if owned_according_to_internal_state?

  file = retry_with_backoff_until_success(timeout,
    retry_logger: method(:log_lock_retry),
    backoff_min: @backoff_min,
    backoff_max: @backoff_max,
    backoff_multiplier: @backoff_multiplier) do

    log_debug { 'Acquiring lock' }
    if (file = create_lock_object)
      log_debug { 'Successfully acquired lock' }
      [:success, file]
    else
      log_debug { 'Error acquiring lock. Investigating why...' }
      file = @bucket.file(@path)
      if file.nil?
        log_warn { 'Lock was deleted right after having created it. Retrying.' }
        :retry_immediately
      elsif file.['identity'] == identity
        log_warn { 'Lock was already owned by this instance, but was abandoned. Resetting lock' }
        delete_lock_object(file.metageneration)
        :retry_immediately
      else
        if lock_stale?(file)
          log_warn { 'Lock is stale. Resetting lock' }
          delete_lock_object(file.metageneration)
        else
          log_debug { 'Lock was already acquired, and is not stale' }
        end
        :error
      end
    end
  end

  refresher_generation = nil
  @state_mutex.synchronize do
    @owner = identity
    @metageneration = file.metageneration
    spawn_refresher_thread
    refresher_generation = @refresher_generation
  end
  log_debug { "Locked. refresher_generation=#{refresher_generation}, metageneration=#{file.metageneration}" }
  nil
end

#locked_according_to_internal_state?Boolean

Returns whether this Lock instance’s internal state believes that the lock is currently held by this instance. Does not check whether the lock is stale.

Returns:

  • (Boolean)


134
135
136
137
138
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 134

def locked_according_to_internal_state?
  @state_mutex.synchronize do
    unsynced_locked_according_to_internal_state?
  end
end

#locked_according_to_server?Boolean

Returns whether the server believes that the lock is currently held by somebody. Does not check whether the lock is stale.

Returns:

  • (Boolean)

Raises:

  • (Google::Cloud::Error)


145
146
147
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 145

def locked_according_to_server?
  !@bucket.file(@path).nil?
end

#owned_according_to_internal_state?Boolean

Returns whether this Lock instance’s internal state believes that the lock is held by the current Lock instance in the calling thread.

Returns:

  • (Boolean)


153
154
155
156
157
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 153

def owned_according_to_internal_state?
  @state_mutex.synchronize do
    unsynced_owned_according_to_internal_state?
  end
end

#owned_according_to_server?Boolean

Returns whether the server believes that the lock is held by the current Lock instance in the calling thread.

Returns:

  • (Boolean)

Raises:

  • (Google::Cloud::Error)


164
165
166
167
168
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 164

def owned_according_to_server?
  file = @bucket.file(@path)
  return false if file.nil?
  file.['identity'] == identity
end

#synchronizeObject

Obtains the lock, runs the block, and releases the lock when the block completes.

If the lock is stale, resets it automatically. If the lock is already obtained by some other instance identity, waits until it becomes available, or until timeout.

Accepts the same parameters as #lock.

Returns:

  • The block’s return value.

Raises:

  • (AlreadyLockedError)

    This Lock instance — according to its internal state — believes that it’s already holding the lock.

  • (TimeoutError)

    Failed to acquire the lock within ‘timeout` seconds.



265
266
267
268
269
270
271
272
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 265

def synchronize(...)
  lock(...)
  begin
    yield
  ensure
    unlock
  end
end

#unlockBoolean

Releases the lock and stops refreshing the lock in the background.

Returns:

  • (Boolean)

    True if the lock object was actually deleted, false if the lock object was already deleted.

Raises:

  • (NotLockedError)

    This Lock instance — according to its internal state — believes that it isn’t currently holding the lock.

  • (Google::Cloud::Error)


233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 233

def unlock
  refresher_generation = nil
  metageneration = nil
  thread = nil

  @state_mutex.synchronize do
    raise NotLockedError, 'Not locked' if !unsynced_locked_according_to_internal_state?
    refresher_generation = @refresher_generation
    thread = shutdown_refresher_thread
    metageneration = @metageneration
    @owner = nil
    @metageneration = nil
  end

  thread.join
  result = delete_lock_object(metageneration)
  log_debug { "Unlocked. refresher_generation=#{refresher_generation}, metageneration=#{metageneration}" }
  result
end