Class: Archipelago::Tranny::Transaction

Inherits:
Object
  • Object
show all
Includes:
Current::Synchronized
Defined in:
lib/archipelago/tranny.rb

Overview

A transaction managed by the manager.

A transaction can have the following states:

  • :active - When it is first created.

    • This transaction can be commited or aborted.

  • :voting - When it has started the two-phase commit and the voting has begun.

    • This transaction can be aborted.

  • :commited - After everyone has voted and voted either :unchanged or :commit.

    • This transaction can not be changed.

  • :aborted - After someone has called abort! or voted :abort in a vote!

    • This transaction can not be changed.

Anyone that wants to join the transaction must implement the following methods:

  • abort!(transaction): Abort the provided transaction.

  • commit!(transaction): Commit the provided transaction.

  • prepare!(transaction): Prepare for commiting the provided transaction.

abort! and commit! should not return any values, but can raise exceptions if required.

prepare! must return either :abort, :commit or :unchanged, depending on what the member is prepared to do. :unchanged is only when the member has not changed state during the transaction, and means that it does not require any further notification on the progress of the transaction.

A member that has returned :commit on the prepare! must store the transaction proxy in a persistent manner to be able to connect to the manager and get a new copy of the transaction in case of communications failure.

A member that reconnects to a crashed transaction manager should not abort! the transaction if the transaction is still in :active state, since the transaction will have been aborted on commit! anyway (since the manager will not be able to prepare! the disconnected member after either the manager or member having crashed) if needed. If the disconnect was just a temporary networking problem, the transaction will continue as planned.

A member that reconnects to a crashed transaction manager where the transaction is in :voting state should just wait around and see if it gets prepare! called. In case of temporary network failure the transaction will continue as planned, otherwise it will abort! automatically.

A member that reconnects to a disconnected transaction manager where the transaction is in :commited state should just commit its state. Then it must notify the transaction using report_commited! so that the transaction can disappear gracefully.

A member that reconnects to a disconnected transaction manager that either doesnt know of the transaction or returns an :aborted transaction may safely abort the state change and forget about the transaction.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Current::Synchronized

#lock_on, #mon_check_owner, #synchronize_on, #unlock_on

Constructor Details

#initialize(options) ⇒ Transaction

Create a transaction managed by the provided manager.

Will have :manager as TransactionManager, and will timeout after :timeout seconds.



326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/archipelago/tranny.rb', line 326

def initialize(options)
  super()
  #
  # A hash where members are keys and their state the values.
  #
  @members = {}
  @members.extend(Archipelago::Current::Synchronized)
  #
  # We are alive!
  #
  @state = :active
  #
  # We have a timeout!
  #
  @timeout = options[:timeout]
  #
  # We are unique!
  # 
  @transaction_id = "#{options[:manager].service_id}:#{Time.new.to_f}:#{self.object_id}:#{rand(1 << 32)}"
  #
  # We have a birth time!
  #
  @created_at = Time.now
  #
  # We have a manager!
  #
  self.manager = options[:manager]

  store_us_for_future_reference!

  start_timeout_thread
end

Instance Attribute Details

#managerObject

Returns the value of attribute manager.



318
319
320
# File 'lib/archipelago/tranny.rb', line 318

def manager
  @manager
end

#proxyObject (readonly)

Returns the value of attribute proxy.



318
319
320
# File 'lib/archipelago/tranny.rb', line 318

def proxy
  @proxy
end

#stateObject (readonly)

Returns the value of attribute state.



318
319
320
# File 'lib/archipelago/tranny.rb', line 318

def state
  @state
end

#transaction_idObject (readonly)

Returns the value of attribute transaction_id.



318
319
320
# File 'lib/archipelago/tranny.rb', line 318

def transaction_id
  @transaction_id
end

Class Method Details

._load(s) ⇒ Object



392
393
394
395
396
397
398
399
400
401
# File 'lib/archipelago/tranny.rb', line 392

def self._load(s)
  members, state, timeout, transaction_id, created_at = Marshal.load(s)
  rval = self.allocate
  rval.instance_variable_set(:@members, members)
  rval.instance_variable_set(:@state, state)
  rval.instance_variable_set(:@timeout, timeout)
  rval.instance_variable_set(:@transaction_id, transaction_id)
  rval.instance_variable_set(:@created_at, created_at)
  rval
end

Instance Method Details

#_dump(l) ⇒ Object

Special dump call to NOT dump our manager or proxy, since DRbObjects dont take kindly to being restored in an environment where they are are known to be invalid.



382
383
384
385
386
387
388
389
390
# File 'lib/archipelago/tranny.rb', line 382

def _dump(l)
  return Marshal.dump([
                       @members,
                       @state,
                       @timeout,
                       @transaction_id,
                       @created_at
                      ])
end

#abort!Object

Abort the transaction, sending all participants the abort! message.



451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
# File 'lib/archipelago/tranny.rb', line 451

def abort!
  synchronize do
    raise IllegalOperationException.new(:abort!, self) if [:commited, :aborted].include?(self.state)
    
    #
    # Set our state.
    #
    @state = :aborted

    store_us_for_future_reference!

    #
    # Abort all members.
    #
    threads = []
    @members.clone.each do |member, state|
      raise RuntimeException.new("This is not supposed to be possible, but we are in abort! with member " +
                                 "#{member.inspect} in state #{state.inspect}") if state == :commited
      if state != :aborted
        threads << Thread.new do 
          begin
            member.abort!(self.proxy)
            @members.synchronize do
              @members[member] = :aborted
            end
          rescue Exception => e
            @manager.log_error(e)
            #
            # We must not let the other members stop just
            # because one member failed. No more Mr Nice Guy!
            #
          end
        end
      end
    end
    
    #
    # Wait for all members to finish.
    #
    threads.each do |thread|
      thread.join
    end

    #
    # No use having aborted transactions lying about.
    #
    # NB: This means that disconnected members that cant
    # find their old transactions will have to presume they
    # have been aborted.
    #
    remove_us_we_are_redundant!
  end
  nil
end

#commit!Object

Commits the transaction, returning the new state (:aborted | :commited)



509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
# File 'lib/archipelago/tranny.rb', line 509

def commit!
  synchronize do
    raise IllegalOperationException.new(:commit!, self) unless self.state == :active

    #
    # Vote for the outcome and act on it
    #
    case vote!
    when :abort
      abort!
    when :commit
      _commit!
    when :unchanged
      @state = :commited
      remove_us_we_are_redundant!
    end

    return @state
  end
  nil
end

#join(member) ⇒ Object

Join a member to this transaction.

Will raise a JoinCountException if the given member has allready joined this transaction.



537
538
539
540
541
542
543
544
545
546
# File 'lib/archipelago/tranny.rb', line 537

def join(member)
  @members.synchronize do
    raise IllegalOperationException.new(:join, self) unless self.state == :active
    raise JoinCountException.new(member, self) if @members.include?(member)

    @members[member] = :active
  end
  store_us_for_future_reference!
  nil
end

#remove_us_we_are_redundant!Object

Remove ourselves, we are redundant



430
431
432
433
# File 'lib/archipelago/tranny.rb', line 430

def remove_us_we_are_redundant!
  @manager.remove_transaction!(self)
  nil
end

#report_commited!(member) ⇒ Object

Used by members that failed during commit.



438
439
440
441
442
443
444
445
446
# File 'lib/archipelago/tranny.rb', line 438

def report_commited!(member)
  raise UnknownMemberException(member, self) unless @members.include?(member)
  raise IllegalOperationException(:report_commited!, self) unless self.state == :commited

  @members[member] = :commited
  
  remove_us_if_all_are_commited!
  nil
end

#start_timeout_threadObject

Starts the thread that will abort! us automatically after we have lived @timeout



407
408
409
410
411
412
413
414
415
416
417
# File 'lib/archipelago/tranny.rb', line 407

def start_timeout_thread
  Thread.new do
    now = Time.now
    die_at = @created_at + @timeout
    if die_at > now
      sleep(die_at - now)
    end
    abort!
  end
  nil
end

#store_us_for_future_reference!Object

What it sounds like.



422
423
424
425
# File 'lib/archipelago/tranny.rb', line 422

def store_us_for_future_reference!
  @manager.store_transaction!(self)
  nil
end