Class: Eventus::Persistence::Mongo

Inherits:
Object
  • Object
show all
Defined in:
lib/eventus/persistence/mongo.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(collection) ⇒ Mongo

Returns a new instance of Mongo.



6
7
8
9
10
11
12
# File 'lib/eventus/persistence/mongo.rb', line 6

def initialize(collection)
  @db = collection.db
  @commits = collection
  @commits.ensure_index :sid => ::Mongo::ASCENDING
  @commits.ensure_index :sid => ::Mongo::ASCENDING, :min => ::Mongo::ASCENDING
  @commits.ensure_index :sid => ::Mongo::ASCENDING, :max => ::Mongo::DESCENDING
end

Instance Attribute Details

#commitsObject (readonly)

Returns the value of attribute commits.



4
5
6
# File 'lib/eventus/persistence/mongo.rb', line 4

def commits
  @commits
end

#dbObject (readonly)

Returns the value of attribute db.



4
5
6
# File 'lib/eventus/persistence/mongo.rb', line 4

def db
  @db
end

Instance Method Details

#commit(events) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/eventus/persistence/mongo.rb', line 14

def commit(events)
  return if events.empty?
  seqs = events.map{|e| e['sequence']}
  doc = {
    '_id'        => "#{events[0]['sid']}_#{seqs.min}",
    'sid'        => events[0]['sid'],
    'min'        => seqs.min,
    'max'        => seqs.max,
    'events'     => events,
    'dispatched' => false
  }

  future = @commits.find_one({sid:doc['sid'], max:{:$gte => doc['min']}})
  raise Eventus::ConcurrencyError if future
  begin
    @commits.insert(doc)
  rescue ::Mongo::OperationFailure => e
    raise Eventus::ConcurrencyError if e.error_code == 11000
    raise
  end
  doc
end

#load(id, min = nil, max = nil) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/eventus/persistence/mongo.rb', line 37

def load(id, min = nil, max = nil)
  Eventus.logger.debug "Loading stream: #{id}"
  query = {sid:id}

  if min
    query[:max] = {:$gte => min}
  end

  if max
    query[:min] = {:$lte => max}
  end

  min ||= 0
  max ||= Float::INFINITY

  @commits.find(query).to_a
    .map{|c| c['events']}
    .flatten
    .reject{|e| e['sequence'] < min}
    .reject{|e| e['sequence'] > max}
    .sort_by{|e| e['sequence']}
end

#load_undispatched_commitsObject



60
61
62
63
64
65
66
# File 'lib/eventus/persistence/mongo.rb', line 60

def load_undispatched_commits
  commits = @commits.find(dispatched:false).to_a
  if commits.length > 0
    Eventus.logger.info "#{commits.length} undispatched commits loaded"
  end
  commits
end

#mark_commit_dispatched(commit_id) ⇒ Object



68
69
70
71
# File 'lib/eventus/persistence/mongo.rb', line 68

def mark_commit_dispatched(commit_id)
  Eventus.logger.debug "Marking commit #{commit_id} dispatched"
  @commits.update({_id: commit_id}, {'$set' => {'dispatched' => true}})
end