Class: DistributedLock::GoogleCloudStorage::Lock
- Inherits:
-
Object
- Object
- DistributedLock::GoogleCloudStorage::Lock
- Includes:
- Utils
- Defined in:
- lib/distributed-lock-google-cloud-storage/lock.rb
Constant Summary collapse
- DEFAULT_INSTANCE_IDENTITY_PREFIX_WITHOUT_PID =
SecureRandom.hex(12).freeze
Class Method Summary collapse
-
.default_instance_identity_prefix ⇒ String
Generates a sane default instance identity prefix string.
Instance Method Summary collapse
-
#abandon ⇒ void
Pretends like we’ve never obtained this lock, abandoning our internal state about the lock.
-
#check_health! ⇒ void
Checks whether the lock is healthy.
-
#healthy? ⇒ Boolean
Returns whether the lock is healthy.
-
#initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix, thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new, ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS, backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX, backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil) ⇒ Lock
constructor
Creates a new Lock instance.
-
#lock(timeout: 2 * @ttl) ⇒ void
Obtains the lock.
-
#locked_according_to_internal_state? ⇒ Boolean
Returns whether this Lock instance’s internal state believes that the lock is currently held by this instance.
-
#locked_according_to_server? ⇒ Boolean
Returns whether the server believes that the lock is currently held by somebody.
-
#owned_according_to_internal_state? ⇒ Boolean
Returns whether this Lock instance’s internal state believes that the lock is held by the current Lock instance in the calling thread.
-
#owned_according_to_server? ⇒ Boolean
Returns whether the server believes that the lock is held by the current Lock instance in the calling thread.
-
#synchronize ⇒ Object
Obtains the lock, runs the block, and releases the lock when the block completes.
-
#unlock ⇒ Boolean
Releases the lock and stops refreshing the lock in the background.
Constructor Details
#initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix, thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new, ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS, backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX, backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil) ⇒ Lock
The logger must either be thread-safe, or all writes to this logger by anything besides this ‘Lock` instance must be synchronized through `logger_mutex`. This is because the logger will be written to by a background thread.
Creates a new Lock instance.
Under the hood we’ll instantiate a [Google::Cloud::Storage::Bucket](googleapis.dev/ruby/google-cloud-storage/latest/Google/Cloud/Storage/Bucket.html) object for accessing the bucket. You can customize the project ID, authentication method, etc. through ‘cloud_storage_options` and `cloud_storage_bucket_options`.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 81 def initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix, thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new, ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS, backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX, backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil) check_refresh_interval_allowed!(ttl, refresh_interval, max_refresh_fails) check_backoff_min!(backoff_min) check_backoff_max!(backoff_max, backoff_min) check_backoff_multiplier!(backoff_multiplier) ### Read-only variables (safe to access concurrently) ### @bucket_name = bucket_name @path = path @instance_identity_prefix = instance_identity_prefix @thread_safe = thread_safe @logger = logger @logger_mutex = logger_mutex @ttl = ttl @refresh_interval = refresh_interval || ttl.to_f / (max_refresh_fails * 3) @max_refresh_fails = max_refresh_fails @backoff_min = backoff_min @backoff_max = backoff_max @backoff_multiplier = backoff_multiplier @object_acl = object_acl @client = create_gcloud_storage_client() @bucket = get_gcloud_storage_bucket(@client, bucket_name, ) @state_mutex = Mutex.new @refresher_cond = ConditionVariable.new ### Read-write variables protected by @state_mutex ### @owner = nil @metageneration = nil @refresher_thread = nil # The refresher generation is incremented every time we shutdown # the refresher thread. It allows the refresher thread to know # whether it's being shut down (and thus shouldn't access/modify # state). @refresher_generation = 0 end |
Class Method Details
.default_instance_identity_prefix ⇒ String
Generates a sane default instance identity prefix string. The result is identical across multiple calls in the same process. It supports forking, so that calling this method in a forked child process automatically returns a different value than when called from the parent process.
The result doesn’t include a thread identitier, which is why we call this a prefix.
26 27 28 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 26 def self.default_instance_identity_prefix "#{DEFAULT_INSTANCE_IDENTITY_PREFIX_WITHOUT_PID}-#{Process.pid}" end |
Instance Method Details
#abandon ⇒ void
This method returns an undefined value.
Pretends like we’ve never obtained this lock, abandoning our internal state about the lock.
Shuts down background lock refreshing, and ensures that #locked_according_to_internal_state? returns false.
Does not modify any server data, so #locked_according_to_server? may still return true.
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 282 def abandon refresher_generation = nil thread = nil @state_mutex.synchronize do if unsynced_locked_according_to_internal_state? refresher_generation = @refresher_generation thread = shutdown_refresher_thread end end if thread log_debug { "Abandoning locked lock" } thread.join log_debug { "Done abandoned locked lock. refresher_generation=#{refresher_generation}" } else log_debug { "Abandoning unlocked lock" } end end |
#check_health! ⇒ void
This method returns an undefined value.
Checks whether the lock is healthy. See #healthy? for the definition of “healthy”.
It only makes sense to call this method after having obtained this lock.
328 329 330 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 328 def check_health! raise LockUnhealthyError, 'Lock is not healthy' if !healthy? end |
#healthy? ⇒ Boolean
Returns whether the lock is healthy. A lock is considered healthy until we fail to refresh the lock too many times consecutively.
Failure to refresh could happen for many reasons, including but not limited to: network problems, the lock object being forcefully deleted by someone else.
“Too many” is defined by the ‘max_refresh_fails` argument passed to the constructor.
It only makes sense to call this method after having obtained this lock.
314 315 316 317 318 319 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 314 def healthy? @state_mutex.synchronize do raise NotLockedError, 'Not locked' if !unsynced_locked_according_to_internal_state? @refresher_thread.alive? end end |
#lock(timeout: 2 * @ttl) ⇒ void
This method returns an undefined value.
Obtains the lock. If the lock is stale, resets it automatically. If the lock is already obtained by some other instance identity, waits until it becomes available, or until timeout.
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 180 def lock(timeout: 2 * @ttl) raise AlreadyLockedError, 'Already locked' if owned_according_to_internal_state? file = retry_with_backoff_until_success(timeout, retry_logger: method(:log_lock_retry), backoff_min: @backoff_min, backoff_max: @backoff_max, backoff_multiplier: @backoff_multiplier) do log_debug { 'Acquiring lock' } if (file = create_lock_object) log_debug { 'Successfully acquired lock' } [:success, file] else log_debug { 'Error acquiring lock. Investigating why...' } file = @bucket.file(@path) if file.nil? log_warn { 'Lock was deleted right after having created it. Retrying.' } :retry_immediately elsif file.['identity'] == identity log_warn { 'Lock was already owned by this instance, but was abandoned. Resetting lock' } delete_lock_object(file.) :retry_immediately else if lock_stale?(file) log_warn { 'Lock is stale. Resetting lock' } delete_lock_object(file.) else log_debug { 'Lock was already acquired, and is not stale' } end :error end end end refresher_generation = nil @state_mutex.synchronize do @owner = identity @metageneration = file. spawn_refresher_thread refresher_generation = @refresher_generation end log_debug { "Locked. refresher_generation=#{refresher_generation}, metageneration=#{file.}" } nil end |
#locked_according_to_internal_state? ⇒ Boolean
Returns whether this Lock instance’s internal state believes that the lock is currently held by this instance. Does not check whether the lock is stale.
134 135 136 137 138 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 134 def locked_according_to_internal_state? @state_mutex.synchronize do unsynced_locked_according_to_internal_state? end end |
#locked_according_to_server? ⇒ Boolean
Returns whether the server believes that the lock is currently held by somebody. Does not check whether the lock is stale.
145 146 147 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 145 def locked_according_to_server? !@bucket.file(@path).nil? end |
#owned_according_to_internal_state? ⇒ Boolean
Returns whether this Lock instance’s internal state believes that the lock is held by the current Lock instance in the calling thread.
153 154 155 156 157 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 153 def owned_according_to_internal_state? @state_mutex.synchronize do unsynced_owned_according_to_internal_state? end end |
#owned_according_to_server? ⇒ Boolean
Returns whether the server believes that the lock is held by the current Lock instance in the calling thread.
164 165 166 167 168 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 164 def owned_according_to_server? file = @bucket.file(@path) return false if file.nil? file.['identity'] == identity end |
#synchronize ⇒ Object
Obtains the lock, runs the block, and releases the lock when the block completes.
If the lock is stale, resets it automatically. If the lock is already obtained by some other instance identity, waits until it becomes available, or until timeout.
Accepts the same parameters as #lock.
265 266 267 268 269 270 271 272 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 265 def synchronize(...) lock(...) begin yield ensure unlock end end |
#unlock ⇒ Boolean
Releases the lock and stops refreshing the lock in the background.
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 233 def unlock refresher_generation = nil = nil thread = nil @state_mutex.synchronize do raise NotLockedError, 'Not locked' if !unsynced_locked_according_to_internal_state? refresher_generation = @refresher_generation thread = shutdown_refresher_thread = @metageneration @owner = nil @metageneration = nil end thread.join result = delete_lock_object() log_debug { "Unlocked. refresher_generation=#{refresher_generation}, metageneration=#{}" } result end |