Class: EntityStoreSequel::PostgresEntityStore

Inherits:
Object
  • Object
show all
Includes:
EntityStore::Logging
Defined in:
lib/entity_store_sequel/postgres_entity_store.rb

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.connect_timeout=(value) ⇒ Object (writeonly)

Sets the attribute connect_timeout

Parameters:

  • value

    the value to set the attribute connect_timeout to.



16
17
18
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 16

def connect_timeout=(value)
  @connect_timeout = value
end

.connection_stringObject

Returns the value of attribute connection_string.



15
16
17
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 15

def connection_string
  @connection_string
end

.native_json_hashObject

Returns the value of attribute native_json_hash.



17
18
19
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 17

def native_json_hash
  @native_json_hash
end

Class Method Details

.databaseObject



19
20
21
22
23
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 19

def database
  return @_database if @_database

  self.database = Sequel.connect(connection_string)
end

.database=(db) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 25

def database=(db)
  @_database = db

  if db.adapter_scheme == :postgres
    @_database.extension :pg_json
    self.native_json_hash = true
  else
    self.native_json_hash = false
  end

  @_database
end

.initObject



38
39
40
41
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 38

def init
  migration_path = File.expand_path("../../sequel/migrations", __FILE__)
  Sequel::Migrator.run(self.database, migration_path, :table=>:entity_store_schema_migration)
end

Instance Method Details

#add_entity(entity, id = BSON::ObjectId.new) ⇒ Object



73
74
75
76
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 73

def add_entity(entity, id = BSON::ObjectId.new)
  entities.insert(:id => id.to_s, :_type => entity.class.name, :version => entity.version)
  id.to_s
end

#add_events(items) ⇒ Object



120
121
122
123
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 120

def add_events(items)
  events_with_id = items.map { |e| [ BSON::ObjectId.new, e ] }
  add_events_with_ids(events_with_id)
end

#add_events_with_ids(event_id_map) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 125

def add_events_with_ids(event_id_map)
  event_id_map.each do |id, event|
    doc = {
      :id => id.to_s,
      :_type => event.class.name,
      :_entity_id => event.entity_id.to_s,
      :entity_version => event.entity_version,
      :at => event.attributes[:at],
      :data => PigeonHole.generate(hash_without_keys(event.attributes, :entity_id, :at, :entity_version)),
    }
    events.insert(doc)
  end
end

#clearObject



56
57
58
59
60
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 56

def clear
  open.drop_table(:entities, :entity_events)
  @entities_collection = nil
  @events_collection = nil
end

#clear_entity_events(id, excluded_types) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 65

def clear_entity_events(id, excluded_types)
  events.where(_entity_id: id).each do |event|
    next if excluded_types.include?(event[:_type])

    events.where(id: event[:id]).delete
  end
end

#ensure_indexesObject



62
63
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 62

def ensure_indexes
end

#entitiesObject



48
49
50
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 48

def entities
  @entities_collection ||= open[:entities]
end

#eventsObject



52
53
54
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 52

def events
  @events_collection ||= open[:entity_events]
end

#get_entities(ids, options = {}) ⇒ Object

Public: loads the entity from the store, including any available snapshots then loads the events to complete the state

ids - Array of Strings representation of BSON::ObjectId options - Hash of options (default: {})

:raise_exception - Boolean (default: true)

Returns an array of entities



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 147

def get_entities(ids, options={})
  ids.each do |id|
    begin
      BSON::ObjectId.from_string(id)
    rescue BSON::ObjectId::Invalid
      raise NotFound.new(id) if options.fetch(:raise_exception, true)
      nil
    end
  end

  entities.where(:id => ids).map do |attrs|
    begin
      entity_type = EntityStore::Config.load_type(attrs[:_type])

      # Check if there is a snapshot key in use
      if entity_type.respond_to? :entity_store_snapshot_key
        active_key = entity_type.entity_store_snapshot_key

        # Discard the snapshot if the keys don't match
        unless active_key == attrs[:snapshot_key]
          attrs.delete(:snapshot)
        end
      end

      if attrs[:snapshot]
        if self.class.native_json_hash
          hash = attrs[:snapshot].to_h
        else
          hash = PigeonHole.parse(attrs[:snapshot])
        end

        entity = entity_type.new(hash)
      else
        entity = entity_type.new({'id' => attrs[:id].to_s })
      end
    rescue => e
      log_error "Error loading type #{attrs[:_type]}", e
      raise
    end

    entity
  end

end

#get_events(criteria) ⇒ Object

Public: get events for an array of criteria objects

because each entity could have a different reference
version this allows optional criteria to be specifed

criteria - Hash :id mandatory, :since_version optional

Examples

get_events_for_criteria([ { id: “23232323”}, { id: “2398429834”, since_version: 4 } ] )

Returns Hash with id as key and Array of Event instances as value



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 204

def get_events(criteria)
  return {} if criteria.empty?

  query = events

  criteria.each_with_index do |item, i|
    filter_method = filter_method_name(i)
    if item[:since_version]
      query = query.send(filter_method, Sequel.lit('_entity_id = ? AND entity_version > ?', item[:id], item[:since_version]))
    else
      query = query.send(filter_method, Sequel.lit('_entity_id = ?', item[:id]))
    end
  end

  result = query.to_hash_groups(:_entity_id)

  result.each do |id, events|
    events.sort_by! { |attrs| [attrs[:entity_version], attrs[:id].to_s] }

    # Convert the attributes into event objects
    events.map! do |attrs|
      begin
        if self.class.native_json_hash
          hash = attrs[:data].to_h
        else
          hash = PigeonHole.parse(attrs[:data])
        end

        hash[:_id] = attrs[:id]
        hash[:entity_version] = attrs[:entity_version]
        hash[:entity_id] = attrs[:_entity_id]
        hash[:at] = attrs[:at]
        EntityStore::Config.load_type(attrs[:_type]).new(hash)
      rescue => e
        log_error "Error loading type #{attrs[:_type]}", e
        nil
      end
    end

    events.compact!
  end

  criteria.each do |item|
    result[item[:id].to_s] ||= []
  end

  result
end

#openObject



44
45
46
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 44

def open
  PostgresEntityStore.database
end

#remove_entity_snapshot(id) ⇒ Object

Public - remove the snapshot for an entity



104
105
106
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 104

def remove_entity_snapshot(id)
  entities.where(:id => id).update(:snapshot => nil)
end

#remove_snapshots(type = nil) ⇒ Object

Public: remove all snapshots

type - String optional class name for the entity



112
113
114
115
116
117
118
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 112

def remove_snapshots(type=nil)
  if type
    entities.where(:_type => type).update(:snapshot => nil)
  else
    entities.update(:snapshot => nil)
  end
end

#save_entity(entity) ⇒ Object



78
79
80
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 78

def save_entity(entity)
  entities.where(:id => entity.id).update(:version => entity.version)
end

#snapshot_entity(entity) ⇒ Object

Public: create a snapshot of the entity and store in the entities collection



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 84

def snapshot_entity(entity)
  if entity.class.respond_to? :entity_store_snapshot_key
    # If there is a snapshot key, store it too
    snapshot_key = entity.class.entity_store_snapshot_key
  else
    # Otherwise, make sure there isn't one set
    snapshot_key = nil
  end

  unless entities[:id => entity.id]
    entities.insert(:id => entity.id, :_type => entity.class.name, :version => entity.version)
  end

  entities
    .where(:id => entity.id)
    .update(:snapshot => PigeonHole.generate(entity.attributes), :snapshot_key => snapshot_key )
end