Class: Sourced::Backends::ActiveRecordBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/sourced/backends/active_record_backend.rb

Defined Under Namespace

Classes: CommandRecord, EventRecord, StreamRecord

Constant Summary collapse

PREFIX =
'sourced'

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeActiveRecordBackend

Returns a new instance of ActiveRecordBackend.



50
51
52
53
54
55
56
57
# File 'lib/sourced/backends/active_record_backend.rb', line 50

def initialize
  @serialize_event = method(:serialize_jsonb_event)
  @deserialize_event = method(:deserialize_jsonb_event)
  if EventRecord.connection.class.name == 'ActiveRecord::ConnectionAdapters::SQLite3Adapter'
    @serialize_event = method(:serialize_sqlite_event)
    @deserialize_event = method(:deserialize_sqlite_event)
  end
end

Class Method Details

.installed?Boolean

Returns:

  • (Boolean)


36
37
38
39
40
# File 'lib/sourced/backends/active_record_backend.rb', line 36

def self.installed?
  ActiveRecord::Base.connection.table_exists?(EventRecord.table_name) &&
    ActiveRecord::Base.connection.table_exists?(StreamRecord.table_name) &&
    ActiveRecord::Base.connection.table_exists?(CommandRecord.table_name)
end

.table_prefixObject



32
33
34
# File 'lib/sourced/backends/active_record_backend.rb', line 32

def self.table_prefix
  @table_prefix || PREFIX
end

.table_prefix=(prefix) ⇒ Object



25
26
27
28
29
30
# File 'lib/sourced/backends/active_record_backend.rb', line 25

def self.table_prefix=(prefix)
  @table_prefix = prefix
  EventRecord.table_name = [prefix, '_events'].join
  StreamRecord.table_name = [prefix, '_streams'].join
  CommandRecord.table_name = [prefix, '_commands'].join
end

.uninstall!Object



42
43
44
45
46
47
48
# File 'lib/sourced/backends/active_record_backend.rb', line 42

def self.uninstall!
  raise 'Not in test environment' unless ENV['ENVIRONMENT'] == 'test'

  ActiveRecord::Base.connection.drop_table(EventRecord.table_name)
  ActiveRecord::Base.connection.drop_table(CommandRecord.table_name)
  ActiveRecord::Base.connection.drop_table(StreamRecord.table_name)
end

Instance Method Details

#append_events(events) ⇒ Object



130
131
132
133
134
135
136
# File 'lib/sourced/backends/active_record_backend.rb', line 130

def append_events(events)
  rows = events.map { |e| serialize_event(e) }
  EventRecord.insert_all!(rows)
  true
rescue ActiveRecord::RecordNotUnique => e
  raise Sourced::ConcurrentAppendError, e.message
end

#clear!Object



61
62
63
64
65
66
67
# File 'lib/sourced/backends/active_record_backend.rb', line 61

def clear!
  raise 'Not in test environment' unless ENV['ENVIRONMENT'] == 'test'

  EventRecord.delete_all
  CommandRecord.delete_all
  StreamRecord.delete_all
end

#installed?Boolean

Returns:

  • (Boolean)


59
# File 'lib/sourced/backends/active_record_backend.rb', line 59

def installed? = self.class.installed?

#read_event_batch(causation_id) ⇒ Object



138
139
140
141
142
# File 'lib/sourced/backends/active_record_backend.rb', line 138

def read_event_batch(causation_id)
  EventRecord.where(causation_id:).order(:global_seq).map do |record|
    deserialize_event(record)
  end
end

#read_event_stream(stream_id, after: nil, upto: nil) ⇒ Object



144
145
146
147
148
149
150
151
# File 'lib/sourced/backends/active_record_backend.rb', line 144

def read_event_stream(stream_id, after: nil, upto: nil)
  query = EventRecord.where(stream_id:)
  query = query.where('seq > ?', after) if after
  query = query.where('seq <= ?', upto) if upto
  query.order(:global_seq).map do |record|
    deserialize_event(record)
  end
end

#reserve_nextObject

 TODO: locking the stream could be done in a single SQL query, or using an SQL function.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/sourced/backends/active_record_backend.rb', line 93

def reserve_next(&)
  command_record = transaction do
    cmd = CommandRecord
      .joins("INNER JOIN #{StreamRecord.table_name} ON #{CommandRecord.table_name}.stream_id = #{StreamRecord.table_name}.stream_id")
      .where(["#{StreamRecord.table_name}.locked = ?", false])
      .order("#{CommandRecord.table_name}.id ASC")
      .lock # "FOR UPDATE"
      .first

    if cmd
      StreamRecord.where(stream_id: cmd.stream_id).update(locked: true)
    end
    cmd
  end

  cmd = nil
  if command_record
    # TODO: find out why #data isn't already
    # deserialized here
    data = JSON.parse(command_record.data, symbolize_names: true)
    cmd = Message.from(data)
    yield cmd
    # Only delete the command if processing didn't raise
    command_record.destroy
  end
  cmd
ensure
  # Always unlock the stream
  if command_record
    StreamRecord.where(stream_id: command_record.stream_id).update(locked: false)
  end
end

#schedule_command(stream_id, command) ⇒ Object



84
85
86
87
88
89
# File 'lib/sourced/backends/active_record_backend.rb', line 84

def schedule_command(stream_id, command)
  CommandRecord.transaction do
    StreamRecord.upsert({ stream_id: }, unique_by: :stream_id)
    CommandRecord.create!(stream_id:, data: command.to_json)
  end
end

#schedule_commands(commands) ⇒ Object

 TODO: if all commands belong to the same stream_id we could upsert the streams table once here



71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/sourced/backends/active_record_backend.rb', line 71

def schedule_commands(commands)
  return false if commands.empty?

  # TODO: here we could use multi_insert
  # for both streams and commands
  CommandRecord.transaction do
    commands.each do |command|
      schedule_command(command.stream_id, command)
    end
  end
  true
end

#transactionObject



126
127
128
# File 'lib/sourced/backends/active_record_backend.rb', line 126

def transaction(&)
  EventRecord.transaction(&)
end