Class: Oplogjam::Schema

Inherits:
Object
  • Object
show all
Defined in:
lib/oplogjam/schema.rb

Constant Summary collapse

COLUMNS =
%i[id document created_at updated_at].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db) ⇒ Schema

Returns a new instance of Schema.



7
8
9
# File 'lib/oplogjam/schema.rb', line 7

def initialize(db)
  @db = db
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



5
6
7
# File 'lib/oplogjam/schema.rb', line 5

def db
  @db
end

Instance Method Details

#add_indexes(name) ⇒ Object



37
38
39
40
41
42
# File 'lib/oplogjam/schema.rb', line 37

def add_indexes(name)
  db.alter_table(name) do
    add_index %i[id deleted_at], unique: true, if_not_exists: true
    add_index :id, unique: true, where: { deleted_at: nil }, if_not_exists: true
  end
end

#create_table(name) ⇒ Object



26
27
28
29
30
31
32
33
34
35
# File 'lib/oplogjam/schema.rb', line 26

def create_table(name)
  db.create_table?(name) do
    uuid :uuid, default: Sequel.function(:uuid_generate_v1), primary_key: true
    jsonb :id, null: false
    jsonb :document, null: false
    timestamp :created_at, null: false
    timestamp :updated_at, null: false
    timestamp :deleted_at
  end
end

#import(collection, name, batch_size = 100) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/oplogjam/schema.rb', line 11

def import(collection, name, batch_size = 100)
  collection.find.snapshot(true).each_slice(batch_size) do |documents|
    values = documents.map { |document|
      [
        Sequel.object_to_json(document.fetch(ID)),
        Sequel.pg_jsonb(document),
        Time.now.utc,
        Time.now.utc
      ]
    }

    db[name].import(COLUMNS, values)
  end
end