Class: Kafka::TransactionStateMachine

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/transaction_state_machine.rb

Defined Under Namespace

Classes: InvalidStateError, InvalidTransitionError

Constant Summary collapse

STATES =
[
  UNINITIALIZED          = :uninitialized,
  READY                  = :ready,
  IN_TRANSACTION         = :in_trasaction,
  COMMITTING_TRANSACTION = :committing_transaction,
  ABORTING_TRANSACTION   = :aborting_transaction,
  ERROR                  = :error
]
TRANSITIONS =
{
  UNINITIALIZED          => [READY, ERROR],
  READY                  => [UNINITIALIZED, COMMITTING_TRANSACTION, ABORTING_TRANSACTION],
  IN_TRANSACTION         => [READY],
  COMMITTING_TRANSACTION => [IN_TRANSACTION],
  ABORTING_TRANSACTION   => [IN_TRANSACTION],
  # Any states can transition to error state
  ERROR                  => STATES
}

Instance Method Summary collapse

Constructor Details

#initialize(logger:) ⇒ TransactionStateMachine

Returns a new instance of TransactionStateMachine.



27
28
29
30
31
# File 'lib/kafka/transaction_state_machine.rb', line 27

def initialize(logger:)
  @state = UNINITIALIZED
  @mutex = Mutex.new
  @logger = TaggedLogger.new(logger)
end

Instance Method Details

#aborting_transaction?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/kafka/transaction_state_machine.rb', line 58

def aborting_transaction?
  in_state?(ABORTING_TRANSACTION)
end

#committing_transaction?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/kafka/transaction_state_machine.rb', line 54

def committing_transaction?
  in_state?(COMMITTING_TRANSACTION)
end

#error?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/kafka/transaction_state_machine.rb', line 62

def error?
  in_state?(ERROR)
end

#in_transaction?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/kafka/transaction_state_machine.rb', line 50

def in_transaction?
  in_state?(IN_TRANSACTION)
end

#ready?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/kafka/transaction_state_machine.rb', line 46

def ready?
  in_state?(READY)
end

#transition_to!(next_state) ⇒ Object

Raises:



33
34
35
36
37
38
39
40
# File 'lib/kafka/transaction_state_machine.rb', line 33

def transition_to!(next_state)
  raise InvalidStateError unless STATES.include?(next_state)
  unless TRANSITIONS[next_state].include?(@state)
    raise InvalidTransitionError, "Could not transition from state '#{@state}' to state '#{next_state}'"
  end
  @logger.debug("Transaction state changed to '#{next_state}'!")
  @mutex.synchronize { @state = next_state }
end

#uninitialized?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/kafka/transaction_state_machine.rb', line 42

def uninitialized?
  in_state?(UNINITIALIZED)
end