Class: Euston::EventStore::OptimisticEventStream

Inherits:
Object
  • Object
show all
Defined in:
lib/euston-eventstore/optimistic_event_stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ OptimisticEventStream

Returns a new instance of OptimisticEventStream.



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 4

def initialize(options)
  @persistence = options[:persistence]
  @committed_events = []
  @committed_headers = {}
  @uncommitted_commands = []
  @uncommitted_events = []
  @uncommitted_headers = {}
  @commit_sequence = 0
  @identifiers = []

  if options.has_key? :snapshot
    snapshot = options[:snapshot]
    @stream_id = snapshot.stream_id
    commits = @persistence.get_from @stream_id, snapshot.stream_revision, options[:max_revision]
    populate_stream snapshot.stream_revision + 1, options[:max_revision], commits
    @stream_revision = snapshot.stream_revision + committed_events.length
  else
    @stream_id = options[:stream_id]
    @stream_revision = 0
    min_revision = options[:min_revision] ||= nil
    max_revision = options[:max_revision] ||= nil

    unless min_revision.nil? || max_revision.nil?
      commits = @persistence.get_from @stream_id, min_revision, max_revision
      populate_stream min_revision, max_revision, commits

      raise StreamNotFoundError if (min_revision > 0 && committed_events.empty?)
    end
  end
end

Instance Attribute Details

#commit_sequenceObject (readonly)

Returns the value of attribute commit_sequence.



35
36
37
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 35

def commit_sequence
  @commit_sequence
end

#committed_eventsObject (readonly)

Returns the value of attribute committed_events.



35
36
37
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 35

def committed_events
  @committed_events
end

#committed_headersObject (readonly)

Returns the value of attribute committed_headers.



35
36
37
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 35

def committed_headers
  @committed_headers
end

#stream_idObject (readonly)

Returns the value of attribute stream_id.



35
36
37
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 35

def stream_id
  @stream_id
end

#stream_revisionObject (readonly)

Returns the value of attribute stream_revision.



35
36
37
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 35

def stream_revision
  @stream_revision
end

#uncommitted_commandsObject (readonly)

Returns the value of attribute uncommitted_commands.



35
36
37
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 35

def uncommitted_commands
  @uncommitted_commands
end

#uncommitted_eventsObject (readonly)

Returns the value of attribute uncommitted_events.



35
36
37
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 35

def uncommitted_events
  @uncommitted_events
end

#uncommitted_headersObject (readonly)

Returns the value of attribute uncommitted_headers.



35
36
37
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 35

def uncommitted_headers
  @uncommitted_headers
end

Instance Method Details

#<<(message) ⇒ Object



37
38
39
40
41
42
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 37

def <<(message)
  return if message.nil? || message.body.nil?

  @uncommitted_commands << message if message.is_a? CommandMessage
  @uncommitted_events   << message if message.is_a? EventMessage
end

#clear_changesObject



44
45
46
47
48
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 44

def clear_changes
  @uncommitted_commands = []
  @uncommitted_events = []
  @uncommitted_headers = {}
end

#commit_changes(commit_id) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/euston-eventstore/optimistic_event_stream.rb', line 50

def commit_changes(commit_id)
  raise Euston::EventStore::DuplicateCommitError if @identifiers.include? commit_id

  return unless has_changes

  begin
    persist_changes commit_id
  rescue ConcurrencyError => e
    commits = @persistence.get_from stream_id, stream_revision + 1, FIXNUM_MAX
    populate_stream stream_revision + 1, FIXNUM_MAX, commits

    raise e
  end
end