Class: Eventus::Persistence::Mongo
- Inherits:
-
Object
- Object
- Eventus::Persistence::Mongo
- Defined in:
- lib/eventus/persistence/mongo.rb
Instance Attribute Summary collapse
-
#commits ⇒ Object
readonly
Returns the value of attribute commits.
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Instance Method Summary collapse
- #commit(events) ⇒ Object
-
#initialize(collection) ⇒ Mongo
constructor
A new instance of Mongo.
- #load(id, min = nil, max = nil) ⇒ Object
- #load_undispatched_commits ⇒ Object
- #mark_commit_dispatched(commit_id) ⇒ Object
Constructor Details
#initialize(collection) ⇒ Mongo
Returns a new instance of Mongo.
6 7 8 9 10 11 12 |
# File 'lib/eventus/persistence/mongo.rb', line 6 def initialize(collection) @db = collection.db @commits = collection @commits.ensure_index :sid => ::Mongo::ASCENDING @commits.ensure_index :sid => ::Mongo::ASCENDING, :min => ::Mongo::ASCENDING @commits.ensure_index :sid => ::Mongo::ASCENDING, :max => ::Mongo::DESCENDING end |
Instance Attribute Details
#commits ⇒ Object (readonly)
Returns the value of attribute commits.
4 5 6 |
# File 'lib/eventus/persistence/mongo.rb', line 4 def commits @commits end |
#db ⇒ Object (readonly)
Returns the value of attribute db.
4 5 6 |
# File 'lib/eventus/persistence/mongo.rb', line 4 def db @db end |
Instance Method Details
#commit(events) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/eventus/persistence/mongo.rb', line 14 def commit(events) return if events.empty? seqs = events.map{|e| e['sequence']} doc = { '_id' => "#{events[0]['sid']}_#{seqs.min}", 'sid' => events[0]['sid'], 'min' => seqs.min, 'max' => seqs.max, 'events' => events, 'dispatched' => false } future = @commits.find_one({sid:doc['sid'], max:{:$gte => doc['min']}}) raise Eventus::ConcurrencyError if future begin @commits.insert(doc) rescue ::Mongo::OperationFailure => e raise Eventus::ConcurrencyError if e.error_code == 11000 raise end doc end |
#load(id, min = nil, max = nil) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/eventus/persistence/mongo.rb', line 37 def load(id, min = nil, max = nil) Eventus.logger.debug "Loading stream: #{id}" query = {sid:id} if min query[:max] = {:$gte => min} end if max query[:min] = {:$lte => max} end min ||= 0 max ||= Float::INFINITY @commits.find(query).to_a .map{|c| c['events']} .flatten .reject{|e| e['sequence'] < min} .reject{|e| e['sequence'] > max} .sort_by{|e| e['sequence']} end |