Class: Archipelago::Treasure::Chest
- Inherits:
-
Object
- Object
- Archipelago::Treasure::Chest
- Includes:
- Disco::Publishable
- Defined in:
- lib/archipelago/treasure.rb
Overview
A possibly remote database that only returns proxies to its contents, and thus runs all methods on its contents itself.
Has support for optimistically locked distributed serializable transactions.
Instance Method Summary collapse
-
#[](key, transaction = nil) ⇒ Object
Return the contents of this chest using a given
keyandtransaction. -
#[]=(key, p1, p2 = nil) ⇒ Object
Put something into this chest with a given
key,valueandtransaction. -
#abort!(transaction) ⇒ Object
Abort
transactionin this Chest. -
#active_transactions ⇒ Object
The transactions active in this Chest.
-
#call_instance_method(key, method, transaction, *arguments, &block) ⇒ Object
Call an instance
methodon whatever this chest holds atkeywith anytransactionandargs. -
#commit!(transaction) ⇒ Object
Commits
transactionin this Chest. -
#delete(key, transaction = nil) ⇒ Object
Delete the value of
keywithintransaction. -
#each(callable) ⇒ Object
Will do
callable.call(key, value) for each key-and-value pair in this Chest. -
#evaluate!(label, timestamp, data) ⇒ Object
Evaluate
dataif we have not already seenlabelor if we have an earliertimestampthan the one given. -
#include?(key, transaction = nil) ⇒ Boolean
Returns true if this Chest includes the given
key, optionally within atransaction. -
#initialize(options = {}) ⇒ Chest
constructor
Initialize a Chest.
-
#prepare!(transaction) ⇒ Object
Prepares
transactionin this Chest.
Methods included from Disco::Publishable
#_dump, append_features, #initialize_publishable, #publish!, #service_id, #stop!, #valid?
Constructor Details
#initialize(options = {}) ⇒ Chest
Initialize a Chest
Will use a BerkeleyHashishProvider using treasure_chest.db in the same dir to get its hashes if not :persistence_provider is given.
Will try to recover crashed transaction every :transaction_recovery_interval seconds or TRANSACTION_RECOVERY_INTERVAL if none is given.
Will use Archipelago::Disco::Publishable by calling initialize_publishable with options.
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/archipelago/treasure.rb', line 255 def initialize( = {}) # # The provider of happy magic persistent hashes of different kinds. # @persistence_provider = [:persistence_provider] || Archipelago::Hashish::BerkeleyHashishProvider.new(Pathname.new(File.(__FILE__)).parent.join("treasure_chest.db")) # # Use the given options to initialize the publishable # instance variables. # initialize_publishable() # # [transaction => [key => instance]] # To know what stuff is visible to a transaction. # @snapshot_by_transaction = {} @snapshot_by_transaction.extend(Archipelago::Current::Synchronized) # # [transaction => [key => when the key was read/updated/deleted] # To know if a transaction is ok to prepare and commit. # = {} # # [transaction => [key => instance]] # To know what transactions were prepared but not # properly finished last run. # @crashed = Set.new initialize_seen_data # # The magical persistent map that defines how we actually # store our data. # @db = @persistence_provider.get_cached_hashish("db") initialize_prepared([:transaction_recovery_interval] || TRANSACTION_RECOVERY_INTERVAL) CHEST_BY_SERVICE_ID[self.service_id] = self end |
Instance Method Details
#[](key, transaction = nil) ⇒ Object
Return the contents of this chest using a given key and transaction.
361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/archipelago/treasure.rb', line 361 def [](key, transaction = nil) join!(transaction) instance = ensure_instance_with_transaction(key, transaction) return nil unless instance if Dubloon === instance return instance.join(transaction) else return Dubloon.new(key, self, transaction, self.service_id) end end |
#[]=(key, p1, p2 = nil) ⇒ Object
Put something into this chest with a given key, value and transaction.
416 417 418 419 420 421 422 423 424 425 426 |
# File 'lib/archipelago/treasure.rb', line 416 def []=(key, p1, p2 = nil) if p2 value = p2 transaction = p1 else value = p1 transaction = nil end return set(key, value, transaction) end |
#abort!(transaction) ⇒ Object
Abort transaction in this Chest.
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 |
# File 'lib/archipelago/treasure.rb', line 447 def abort!(transaction) assert_transaction(transaction) snapshot = @snapshot_by_transaction[transaction] # # Make sure nobody can modify this transaction while we # are aborting it. # snapshot.synchronize do serialized_transaction = Marshal.dump(transaction) # # If this transaction was successfully prepared # if @snapshot_by_transaction_db.include?(serialized_transaction) # # Unlock the keys that are part of it. # snapshot.each do |key, value| @db.unlock_on(key) end # # And remove it from persistent storage. # @snapshot_by_transaction_db[serialized_transaction] = nil # # And remove its timestamps from persistent storage. # [serialized_transaction] = nil end # # Finally delete it from the snapshots. # @snapshot_by_transaction.delete(transaction) # # And from the timestamps. # .delete(transaction) end end |
#active_transactions ⇒ Object
The transactions active in this Chest.
303 304 305 |
# File 'lib/archipelago/treasure.rb', line 303 def active_transactions @snapshot_by_transaction.keys.clone end |
#call_instance_method(key, method, transaction, *arguments, &block) ⇒ Object
Call an instance method on whatever this chest holds at key with any transaction and args.
If a transaction is provided and the value for the key respond_to?(:with_transaction) then the actual method call will be wrapped within the block sent to with_transaction(transaction, &block).
436 437 438 439 440 441 442 |
# File 'lib/archipelago/treasure.rb', line 436 def call_instance_method(key, method, transaction, *arguments, &block) if transaction return call_with_transaction(key, method, transaction, *arguments, &block) else return call_without_transaction(key, method, *arguments, &block) end end |
#commit!(transaction) ⇒ Object
Commits transaction in this Chest.
NB: Transaction must be prepared before commit is called.
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 |
# File 'lib/archipelago/treasure.rb', line 550 def commit!(transaction) assert_transaction(transaction) raise IllegalCommitException.new(self, transaction) unless @snapshot_by_transaction_db.include?(Marshal.dump(transaction)) snapshot = @snapshot_by_transaction[transaction] # # Make sure nobody can modify this transaction while we are # commiting it. # snapshot.synchronize do # # Copy each key and value from our private space to the real space # snapshot.each do |key, value| if value == :deleted @db.delete(key) else @db[key] = value end end # # Call abort! to clean up after the transaction. # abort!(transaction) end end |
#delete(key, transaction = nil) ⇒ Object
Delete the value of key within transaction.
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/archipelago/treasure.rb', line 377 def delete(key, transaction = nil) join!(transaction) rval = nil if transaction # # If we have a transaction we must note that it is deleted in a # separate space for that transaction. # snapshot = @snapshot_by_transaction[transaction] snapshot.synchronize do rval = snapshot[key] snapshot[key] = :deleted # # Make sure we remember when it was last changed according to our main db. # = [transaction] [key] = @db.(key) unless .include?(key) end else # # Otherwise just ask our persistence provider to delete it. # @db.delete(key) end rval.freeze return rval end |
#each(callable) ⇒ Object
Will do callable.call(key, value) for each key-and-value pair in this Chest.
NB: This is totaly thread-unsafe, only do this for management or rescue!
314 315 316 |
# File 'lib/archipelago/treasure.rb', line 314 def each(callable) @db.each(callable) end |
#evaluate!(label, timestamp, data) ⇒ Object
Evaluate data if we have not already seen label or if we have an earlier timestamp than the one given.
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/archipelago/treasure.rb', line 321 def evaluate!(label, , data) serialized_label = Marshal.dump(label) go = false if @seen_data.include?(serialized_label) , last_data = Marshal.load(@seen_data[serialized_label]) go = true if > else go = true end if go begin Object.class_eval(data) @seen_data[serialized_label] = Marshal.dump([, data]) rescue Exception => e @seen_data[serialized_label] = Marshal.load([, nil]) puts e pp e.backtrace end end end |
#include?(key, transaction = nil) ⇒ Boolean
Returns true if this Chest includes the given key, optionally within a transaction.
347 348 349 350 351 352 353 354 355 356 |
# File 'lib/archipelago/treasure.rb', line 347 def include?(key, transaction = nil) join!(transaction) if transaction return @snapshot_by_transaction[transaction].include?(key) else return @db.include?(key) end end |
#prepare!(transaction) ⇒ Object
Prepares transaction in this Chest.
NB: This will cause any update of the data within this transaction to block until it is either aborted or commited!
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 |
# File 'lib/archipelago/treasure.rb', line 495 def prepare!(transaction) assert_transaction(transaction) # # If we dont know about this transaction then it can't very well # affect us. # return :unchanged unless @snapshot_by_transaction.include?(transaction) snapshot = @snapshot_by_transaction[transaction] # # Make sure nobody can modify this transaction while we are # preparing it. # snapshot.synchronize do # # Remember what locks we acquire so that we can # unlock them in case of failure. # locks = [] = [transaction] # # Acquire a lock on each key in the transaction # snapshot.each do |key, value| if @db.(key) == [key] @db.lock_on(key) locks << key else locks.each do |key| @db.unlock_on(key) end return :abort end end serialized_transaction = Marshal.dump(transaction) # # Dump its state to persistent storage. # @snapshot_by_transaction_db[serialized_transaction] = Marshal.dump(snapshot) # # And dump its timestamps to persistent storage # [serialized_transaction] = Marshal.dump() return :prepared end end |