Class: EntityStoreSequel::PostgresEntityStore

Inherits:
Object
  • Object
show all
Includes:
EntityStore::Logging
Defined in:
lib/entity_store_sequel/postgres_entity_store.rb

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database_connection = nil) ⇒ PostgresEntityStore

Returns a new instance of PostgresEntityStore.



44
45
46
47
48
49
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 44

def initialize(database_connection = nil)
  return unless database_connection

  @database_connection = database_connection
  @database_connection.extension :pg_json
end

Class Attribute Details

.connect_timeout=(value) ⇒ Object (writeonly)

Sets the attribute connect_timeout

Parameters:

  • value

    the value to set the attribute connect_timeout to.



16
17
18
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 16

def connect_timeout=(value)
  @connect_timeout = value
end

.connection_stringObject

Returns the value of attribute connection_string.



15
16
17
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 15

def connection_string
  @connection_string
end

.native_json_hashObject

Returns the value of attribute native_json_hash.



17
18
19
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 17

def native_json_hash
  @native_json_hash
end

Class Method Details

.databaseObject



19
20
21
22
23
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 19

def database
  return @_database if @_database

  self.database = Sequel.connect(connection_string)
end

.database=(db) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 25

def database=(db)
  @_database = db

  if db.adapter_scheme == :postgres
    @_database.extension :pg_json
    self.native_json_hash = true
  else
    self.native_json_hash = false
  end

  @_database
end

.initObject



38
39
40
41
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 38

def init
  migration_path = File.expand_path("../../sequel/migrations", __FILE__)
  Sequel::Migrator.run(self.database, migration_path, table: :entity_store_schema_migration)
end

Instance Method Details

#add_entity(entity, id = BSON::ObjectId.new) ⇒ Object



80
81
82
83
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 80

def add_entity(entity, id = BSON::ObjectId.new)
  entities.insert(:id => id.to_s, :_type => entity.class.name, :version => entity.version)
  id.to_s
end

#add_events(items) ⇒ Object



127
128
129
130
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 127

def add_events(items)
  events_with_id = items.map { |e| [ BSON::ObjectId.new, e ] }
  add_events_with_ids(events_with_id)
end

#add_events_with_ids(event_id_map) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 132

def add_events_with_ids(event_id_map)
  event_id_map.each do |id, event|
    doc = {
      :id => id.to_s,
      :_type => event.class.name,
      :_entity_id => event.entity_id.to_s,
      :entity_version => event.entity_version,
      :at => event.attributes[:at],
      :data => PigeonHole.generate(hash_without_keys(event.attributes, :entity_id, :at, :entity_version)),
    }
    events.insert(doc)
  end
end

#clearObject



63
64
65
66
67
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 63

def clear
  open.drop_table(:entities, :entity_events)
  @entities_collection = nil
  @events_collection = nil
end

#clear_entity_events(id, excluded_types) ⇒ Object



72
73
74
75
76
77
78
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 72

def clear_entity_events(id, excluded_types)
  events.where(_entity_id: id).each do |event|
    next if excluded_types.include?(event[:_type])

    events.where(id: event[:id]).delete
  end
end

#ensure_indexesObject



69
70
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 69

def ensure_indexes
end

#entitiesObject



55
56
57
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 55

def entities
  @entities_collection ||= open[:entities]
end

#eventsObject



59
60
61
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 59

def events
  @events_collection ||= open[:entity_events]
end

#get_entities(ids, options = {}) ⇒ Object

Public: loads the entity from the store, including any available snapshots then loads the events to complete the state

ids - Array of Strings representation of BSON::ObjectId options - Hash of options (default: {})

:raise_exception - Boolean (default: true)

Returns an array of entities



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 154

def get_entities(ids, options={})
  ids.each do |id|
    begin
      BSON::ObjectId.from_string(id)
    rescue BSON::ObjectId::Invalid
      raise NotFound.new(id) if options.fetch(:raise_exception, true)
      nil
    end
  end

  entities.where(:id => ids).map do |attrs|
    begin
      entity_type = EntityStore::Config.load_type(attrs[:_type])

      # Check if there is a snapshot key in use
      if entity_type.respond_to? :entity_store_snapshot_key
        active_key = entity_type.entity_store_snapshot_key

        # Discard the snapshot if the keys don't match
        unless active_key == attrs[:snapshot_key]
          attrs.delete(:snapshot)
        end
      end

      if attrs[:snapshot]
        if self.class.native_json_hash
          hash = attrs[:snapshot].to_h
        else
          hash = PigeonHole.parse(attrs[:snapshot])
        end

        entity = entity_type.new(hash)
      else
        entity = entity_type.new({'id' => attrs[:id].to_s })
      end
    rescue => e
      log_error "Error loading type #{attrs[:_type]}", e
      raise
    end

    entity
  end

end

#get_events(criteria) ⇒ Object

Public: get events for an array of criteria objects

because each entity could have a different reference
version this allows optional criteria to be specifed

criteria - Hash :id mandatory, :since_version optional

Examples

get_events_for_criteria([ { id: “23232323”}, { id: “2398429834”, since_version: 4 } ] )

Returns Hash with id as key and Array of Event instances as value



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 211

def get_events(criteria)
  return {} if criteria.empty?

  query = events

  criteria.each_with_index do |item, i|
    filter_method = filter_method_name(i)
    if item[:since_version]
      query = query.send(filter_method, Sequel.lit('_entity_id = ? AND entity_version > ?', item[:id], item[:since_version]))
    else
      query = query.send(filter_method, Sequel.lit('_entity_id = ?', item[:id]))
    end
  end

  result = query.to_hash_groups(:_entity_id)

  result.each do |id, events|
    events.sort_by! { |attrs| [attrs[:entity_version], attrs[:id].to_s] }

    # Convert the attributes into event objects
    events.map! do |attrs|
      begin
        if self.class.native_json_hash
          hash = attrs[:data].to_h
        else
          hash = PigeonHole.parse(attrs[:data])
        end

        hash[:_id] = attrs[:id]
        hash[:entity_version] = attrs[:entity_version]
        hash[:entity_id] = attrs[:_entity_id]
        hash[:at] = attrs[:at]
        EntityStore::Config.load_type(attrs[:_type]).new(hash)
      rescue => e
        log_error "Error loading type #{attrs[:_type]}", e
        nil
      end
    end

    events.compact!
  end

  criteria.each do |item|
    result[item[:id].to_s] ||= []
  end

  result
end

#openObject



51
52
53
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 51

def open
  @database_connection || PostgresEntityStore.database
end

#remove_entity_snapshot(id) ⇒ Object

Public - remove the snapshot for an entity



111
112
113
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 111

def remove_entity_snapshot(id)
  entities.where(:id => id).update(:snapshot => nil)
end

#remove_snapshots(type = nil) ⇒ Object

Public: remove all snapshots

type - String optional class name for the entity



119
120
121
122
123
124
125
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 119

def remove_snapshots(type=nil)
  if type
    entities.where(:_type => type).update(:snapshot => nil)
  else
    entities.update(:snapshot => nil)
  end
end

#save_entity(entity) ⇒ Object



85
86
87
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 85

def save_entity(entity)
  entities.where(:id => entity.id).update(:version => entity.version)
end

#snapshot_entity(entity) ⇒ Object

Public: create a snapshot of the entity and store in the entities collection



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/entity_store_sequel/postgres_entity_store.rb', line 91

def snapshot_entity(entity)
  if entity.class.respond_to? :entity_store_snapshot_key
    # If there is a snapshot key, store it too
    snapshot_key = entity.class.entity_store_snapshot_key
  else
    # Otherwise, make sure there isn't one set
    snapshot_key = nil
  end

  unless entities[:id => entity.id]
    entities.insert(:id => entity.id, :_type => entity.class.name, :version => entity.version)
  end

  entities
    .where(:id => entity.id)
    .update(:snapshot => PigeonHole.generate(entity.attributes), :snapshot_key => snapshot_key )
end