Class: Archipelago::Treasure::Chest
- Inherits:
-
Object
- Object
- Archipelago::Treasure::Chest
- Includes:
- Disco::Publishable
- Defined in:
- lib/archipelago/treasure.rb
Overview
A possibly remote database that only returns proxies to its contents, and thus runs all methods on its contents itself.
Has support for optimistically locked distributed serializable transactions.
Instance Attribute Summary collapse
-
#captain ⇒ Object
readonly
Returns the value of attribute captain.
Attributes included from Disco::Camel
Instance Method Summary collapse
-
#[](key, transaction = nil) ⇒ Object
Return the contents of this chest using a given
keyandtransaction. -
#[]=(key, p1, p2 = nil) ⇒ Object
Put something into this chest with a given
key,valueandtransaction. -
#abort!(transaction) ⇒ Object
Abort
transactionin this Chest. -
#active_transactions ⇒ Object
The transactions active in this Chest.
-
#call_instance_method(key, method, transaction, *arguments, &block) ⇒ Object
Call an instance
methodon whatever this chest holds atkeywith anytransactionandargs. -
#commit!(transaction) ⇒ Object
Commits
transactionin this Chest. -
#delete(key, transaction = nil) ⇒ Object
Delete the value of
keywithintransaction. -
#each(callable) ⇒ Object
Will do
callable.call(key, value) for each key-and-value pair in this Chest. -
#evaluate!(label, timestamp, data) ⇒ Object
Evaluate
dataif we have not already seenlabelor if we have an earliertimestampthan the one given. -
#include?(key, transaction = nil) ⇒ Boolean
Returns true if this Chest includes the given
key, optionally within atransaction. -
#initialize(options = {}) ⇒ Chest
constructor
Initialize a Chest.
-
#notify!(predecessor_id) ⇒ Object
Our predecessor is making itself public, so make sure we dont know about any keys we shouldnt.
-
#prepare!(transaction) ⇒ Object
Prepares
transactionin this Chest.
Methods included from Disco::Publishable
#_dump, append_features, #close!, #publish!, #service_id, #unpublish!, #valid?
Constructor Details
#initialize(options = {}) ⇒ Chest
Initialize a Chest
Will use a BerkeleyHashishProvider using treasure_chest.db in the same dir to get its hashes if not :persistence_directory is given.
Will try to recover crashed transaction every :transaction_recovery_interval seconds or TRANSACTION_RECOVERY_INTERVAL if none is given.
Will store the actual data in a remote Archipelago::Dump network if :officer is given or in a local hash if not.
Will use Archipelago::Disco::Publishable by calling initialize_publishable with options.
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/archipelago/treasure.rb', line 327 def initialize( = {}) # # Use the given options to initialize the publishable # instance variables. # initialize_publishable() # # [transaction => [key => instance]] # To know what stuff is visible to a transaction. # @snapshot_by_transaction = {} @snapshot_by_transaction.extend(Archipelago::Current::Synchronized) # # [transaction => [key => when the key was read/updated/deleted] # To know if a transaction is ok to prepare and commit. # = {} # # [transaction => [key => instance]] # To know what transactions were prepared but not # properly finished last run. # @crashed = Set.new initialize_seen_data # # The magical persistent map that defines how we actually # store our data. # if [:officer] @db = @persistence_provider.get_cached_hashish(:officer => [:officer]) else @db = @persistence_provider.get_cached_hashish(:name => "db") end initialize_prepared([:transaction_recovery_interval] || TRANSACTION_RECOVERY_INTERVAL) CHEST_BY_SERVICE_ID[self.service_id] = self # # The provider of Chest index and other magic. # @captain = [:captain] || (defined?(Archipelago::Pirate::BLACKBEARD) ? Archipelago::Pirate::BLACKBEARD : Archipelago::Pirate::Captain.new) # # Our predecessor. We will NOT accept remote calls for keys before this one. # @predecessor_id = nil end |
Instance Attribute Details
#captain ⇒ Object (readonly)
Returns the value of attribute captain.
311 312 313 |
# File 'lib/archipelago/treasure.rb', line 311 def captain @captain end |
Instance Method Details
#[](key, transaction = nil) ⇒ Object
Return the contents of this chest using a given key and transaction.
456 457 458 459 460 461 462 463 464 465 466 467 |
# File 'lib/archipelago/treasure.rb', line 456 def [](key, transaction = nil) join!(transaction) instance = ensure_instance_with_transaction(key, transaction) return nil unless instance if Dubloon.dubloon?(instance) return instance.join(transaction) else return Dubloon.new(key, self, transaction, self.service_id) end end |
#[]=(key, p1, p2 = nil) ⇒ Object
Put something into this chest with a given key, value and transaction.
511 512 513 514 515 516 517 518 519 520 521 |
# File 'lib/archipelago/treasure.rb', line 511 def []=(key, p1, p2 = nil) if p2 value = p2 transaction = p1 else value = p1 transaction = nil end return set(key, value, transaction) end |
#abort!(transaction) ⇒ Object
Abort transaction in this Chest.
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 |
# File 'lib/archipelago/treasure.rb', line 544 def abort!(transaction) assert_transaction(transaction) snapshot = @snapshot_by_transaction[transaction] # # Make sure nobody can modify this transaction while we # are aborting it. # snapshot.synchronize do serialized_transaction = Marshal.dump(transaction) # # If this transaction was successfully prepared # if @snapshot_by_transaction_db.include?(serialized_transaction) # # Unlock the keys that are part of it. # snapshot.each do |key, value| @db.unlock_on(key) end # # And remove it from persistent storage. # @snapshot_by_transaction_db[serialized_transaction] = nil # # And remove its timestamps from persistent storage. # [serialized_transaction] = nil end # # Finally delete it from the snapshots. # @snapshot_by_transaction.delete(transaction) # # And from the timestamps. # .delete(transaction) end end |
#active_transactions ⇒ Object
The transactions active in this Chest.
398 399 400 |
# File 'lib/archipelago/treasure.rb', line 398 def active_transactions @snapshot_by_transaction.keys.clone end |
#call_instance_method(key, method, transaction, *arguments, &block) ⇒ Object
Call an instance method on whatever this chest holds at key with any transaction and args.
If a transaction is provided and the value for the key respond_to?(:with_transaction) then the actual method call will be wrapped within the block sent to with_transaction(transaction, &block).
531 532 533 534 535 536 537 538 539 |
# File 'lib/archipelago/treasure.rb', line 531 def call_instance_method(key, method, transaction, *arguments, &block) assert_mine(key) if transaction return call_with_transaction(key, method, transaction, *arguments, &block) else return call_without_transaction(key, method, *arguments, &block) end end |
#commit!(transaction) ⇒ Object
Commits transaction in this Chest.
NB: Transaction must be prepared before commit is called.
647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 |
# File 'lib/archipelago/treasure.rb', line 647 def commit!(transaction) assert_transaction(transaction) raise IllegalCommitException.new(self, transaction) unless @snapshot_by_transaction_db.include?(Marshal.dump(transaction)) snapshot = @snapshot_by_transaction[transaction] # # Make sure nobody can modify this transaction while we are # commiting it. # snapshot.synchronize do # # Copy each key and value from our private space to the real space # snapshot.each do |key, value| @db.without_lock do if value == :deleted @db.delete(key) else @db[key] = value end end end # # Call abort! to clean up after the transaction. # abort!(transaction) end end |
#delete(key, transaction = nil) ⇒ Object
Delete the value of key within transaction.
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
# File 'lib/archipelago/treasure.rb', line 472 def delete(key, transaction = nil) join!(transaction) rval = nil if transaction # # If we have a transaction we must note that it is deleted in a # separate space for that transaction. # snapshot = @snapshot_by_transaction[transaction] snapshot.synchronize do rval = snapshot[key] snapshot[key] = :deleted # # Make sure we remember when it was last changed according to our main db. # = [transaction] [key] = @db.(key) unless .include?(key) end else # # Otherwise just ask our persistence provider to delete it. # @db.delete(key) end rval.freeze return rval end |
#each(callable) ⇒ Object
Will do callable.call(key, value) for each key-and-value pair in this Chest.
NB: This is totaly thread-unsafe, only do this for management or rescue!
409 410 411 |
# File 'lib/archipelago/treasure.rb', line 409 def each(callable) @db.each(callable) end |
#evaluate!(label, timestamp, data) ⇒ Object
Evaluate data if we have not already seen label or if we have an earlier timestamp than the one given.
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/archipelago/treasure.rb', line 416 def evaluate!(label, , data) serialized_label = Marshal.dump(label) go = false if @seen_data.include?(serialized_label) , last_data = Marshal.load(@seen_data[serialized_label]) go = true if > else go = true end if go begin Object.class_eval(data) @seen_data[serialized_label] = Marshal.dump([, data]) rescue Exception => e @seen_data[serialized_label] = Marshal.load([, nil]) puts e pp e.backtrace end end end |
#include?(key, transaction = nil) ⇒ Boolean
Returns true if this Chest includes the given key, optionally within a transaction.
442 443 444 445 446 447 448 449 450 451 |
# File 'lib/archipelago/treasure.rb', line 442 def include?(key, transaction = nil) join!(transaction) if transaction return @snapshot_by_transaction[transaction].include?(key) else return @db.include?(key) end end |
#notify!(predecessor_id) ⇒ Object
Our predecessor is making itself public, so make sure we dont know about any keys we shouldnt.
385 386 387 388 389 390 391 392 393 |
# File 'lib/archipelago/treasure.rb', line 385 def notify!(predecessor_id) @predecessor_id = predecessor_id while (to_check = @db.first) && !between?(to_check.first, @predecessor_id, service_id) @db.forget(to_check.first) end while (to_check = @db.last) && !between?(to_check.first, @predecessor_id, service_id) @db.forget(to_check.first) end end |
#prepare!(transaction) ⇒ Object
Prepares transaction in this Chest.
NB: This will cause any update of the data within this transaction to block until it is either aborted or commited!
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 |
# File 'lib/archipelago/treasure.rb', line 592 def prepare!(transaction) assert_transaction(transaction) # # If we dont know about this transaction then it can't very well # affect us. # return :unchanged unless @snapshot_by_transaction.include?(transaction) snapshot = @snapshot_by_transaction[transaction] # # Make sure nobody can modify this transaction while we are # preparing it. # snapshot.synchronize do # # Remember what locks we acquire so that we can # unlock them in case of failure. # locks = [] = [transaction] # # Acquire a lock on each key in the transaction # snapshot.each do |key, value| if @db.(key) == [key] @db.lock_on(key) locks << key else locks.each do |key| @db.unlock_on(key) end return :abort end end serialized_transaction = Marshal.dump(transaction) # # Dump its state to persistent storage. # @snapshot_by_transaction_db[serialized_transaction] = Marshal.dump(snapshot) # # And dump its timestamps to persistent storage # [serialized_transaction] = Marshal.dump() return :prepared end end |