Class: Dataflow::Adapters::SqlAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/dataflow/adapters/sql_adapter.rb

Overview

Interface between a data node and mongodb. We use mongodb to perform all the store/retrieve operations.

Direct Known Subclasses

MysqlAdapter, PsqlAdapter

Constant Summary collapse

SYSTEM_ID =
:_id

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ SqlAdapter

Returns a new instance of SqlAdapter.



72
73
74
75
76
# File 'lib/dataflow/adapters/sql_adapter.rb', line 72

def initialize(args)
  update_settings(args)
  @client = SqlAdapter.client(settings)
  @schema = settings.schema || [] # TODO: detect if the table schema has a mis-match
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



70
71
72
# File 'lib/dataflow/adapters/sql_adapter.rb', line 70

def client
  @client
end

#settingsObject (readonly)

Returns the value of attribute settings.



69
70
71
# File 'lib/dataflow/adapters/sql_adapter.rb', line 69

def settings
  @settings
end

Class Method Details

.client(settings, db_name: nil) ⇒ Sequel::Database

Get (or create) a client that satisfies the given connection settings.

Parameters:

  • settings (Hash)

    Represents the connection settings to the DB.

  • db_name (String) (defaults to: nil)

    The database name to which the client will connect.

Returns:

  • (Sequel::Database)

    a sequel database object.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/dataflow/adapters/sql_adapter.rb', line 12

def client(settings, db_name: nil)
  @clients ||= {}

  case settings.adapter_type
  when 'mysql2'
    host = ENV['MOJACO_MYSQL_ADDRESS'] || '127.0.0.1'
    port = ENV['MOJACO_MYSQL_PORT'] || '3306'
    user = ENV['MOJACO_MYSQL_USER']
    password = ENV['MOJACO_MYSQL_PASSWORD']
  when 'postgresql'
    host = ENV['MOJACO_POSTGRESQL_ADDRESS'] || '127.0.0.1'
    port = ENV['MOJACO_POSTGRESQL_PORT'] || '5432'
    user = ENV['MOJACO_POSTGRESQL_USER']
    password = ENV['MOJACO_POSTGRESQL_PASSWORD']
  end

  db_name ||= settings.db_name
  user_password = user
  user_password += ":#{password}" if password.present?

  uri = "#{settings.adapter_type}://#{user_password}@#{host}:#{port}"
  connection_uri = settings.connection_uri || "#{uri}/#{db_name}"

  return @clients[connection_uri] if @clients[connection_uri].present?

  # first, make sure the DB is created (if it is not an external db)
  is_external_db = settings.connection_uri.present?
  try_create_db(uri, db_name, user, password) unless is_external_db

  # then, create the connection object
  @clients[connection_uri] ||= Sequel.connect("#{connection_uri}?encoding=utf8")
end

.disconnect_clientsObject

Force the clients to disconnect their connections. Use before forking.



61
62
63
64
# File 'lib/dataflow/adapters/sql_adapter.rb', line 61

def disconnect_clients
  @clients ||= {}
  @clients.values.each(&:disconnect)
end

.try_create_db(uri, db_name, user, password) ⇒ Boolean

Used internally to try to create the DB automatically.

Parameters:

  • uri (String)

    the connection uri to the DB.

  • db_name (String)

    the database name.

Returns:

  • (Boolean)

    whether the db was created or not.



49
50
51
52
53
54
55
56
57
# File 'lib/dataflow/adapters/sql_adapter.rb', line 49

def try_create_db(uri, db_name, user, password)
  Sequel.connect(uri, user: user, password: password) do |db|
    db.run("CREATE DATABASE #{db_name}")
    true
  end
rescue Sequel::DatabaseError => e
  # ignore error
  false
end

Instance Method Details

#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) ⇒ Object

retrieve all elements from a data node



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/dataflow/adapters/sql_adapter.rb', line 92

def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0)
  res = client[settings.read_dataset_name.to_sym]

  # if there is no fields, automatically
  # select all the fields expect the system _id
  fields = res.columns.reject { |x| x == SYSTEM_ID } if fields.blank?

  res = res.select(*fields.map(&:to_sym)) if fields.present?
  res = apply_query(res, where)

  (sort || {}).each do |k, v|
    sort_value = v == 1 ? k.to_sym : Sequel.desc(k.to_sym)
    res = res.order(sort_value)
  end

  res = res.offset(offset) if offset > 0
  res = res.limit(limit) if limit > 0

  if block_given?
    yield res
  else
    res.to_a
  end
end

#count(where: {}) ⇒ Object

count the number of records



135
136
137
138
139
140
141
# File 'lib/dataflow/adapters/sql_adapter.rb', line 135

def count(where: {})
  res = client[settings.read_dataset_name.to_sym]
  res = apply_query(res, where)
  res.count
rescue Sequel::DatabaseError
  0
end

#create_indexes(dataset: nil, type: :all) ⇒ Object

Create the indexes on this dataset. TODO: add support for a :drop_retry_on_error parameter.

Parameters:

  • dataset (String) (defaults to: nil)

    Specify on which dataset the operation will be performed. Default: the adatpter’s settings’ dataset.

  • type (Symbol) (defaults to: :all)

    select which indexes type to create. Can be :all (default), :unique_only, :non_unique_only.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/dataflow/adapters/sql_adapter.rb', line 185

def create_indexes(dataset: nil, type: :all)
  dataset ||= settings.write_dataset_name
  dataset = dataset.to_sym
  indexes = (settings.indexes || [])

  case type
  when :unique_only
    indexes = indexes.select { |idx| idx['unique'] }
  when :non_unique_only
    indexes = indexes.reject { |idx| idx['unique'] }
  end

  indexes.each do |index|
    params = index_parameters(index)

    begin
      client.add_index(dataset, *params)
    rescue Sequel::DatabaseError => e
      # ignore index already exists
      raise e unless e.wrapped_exception.is_a?(PG::DuplicateTable)
    end
  end
end

#delete(where: {}) ⇒ Object

Note:

this deletes on the read dataset

Delete records that match the options. i.e. changes are seen immediately in the case of double buffered datasets

Parameters:

  • where (defaults to: {})

    query to apply on the delete operation.



160
161
162
163
164
# File 'lib/dataflow/adapters/sql_adapter.rb', line 160

def delete(where: {})
  res = client[settings.read_dataset_name.to_sym]
  res = apply_query(res, where)
  res.delete
end

#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object

retrieve a single element from a data node



87
88
89
# File 'lib/dataflow/adapters/sql_adapter.rb', line 87

def find(where: {}, fields: [], sort: {}, offset: 0)
  all(where: where, fields: fields, sort: sort, offset: offset, limit: 1).first
end

#ordered_system_id_queries(batch_size:) ⇒ Object

Create queries that permit processing the whole dataset in parallel without using offsets.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/dataflow/adapters/sql_adapter.rb', line 118

def ordered_system_id_queries(batch_size:)
  ids = all(fields: [SYSTEM_ID], sort: { SYSTEM_ID => 1 }).map { |x| x[SYSTEM_ID] }
  queries_count = (ids.size / batch_size.to_f).ceil
  Array.new(queries_count) do |i|
    from = ids[i * batch_size]
    to = ids[(i + 1) * batch_size] || ids[-1]
    is_last = i == queries_count - 1

    where_query = { SYSTEM_ID => { '>=' => from } }
    operator = is_last ? '<=' : '<'
    where_query[SYSTEM_ID][operator] = to

    where_query
  end
end

#recreate_dataset(dataset: nil) ⇒ Object

recreate the table/collection



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/dataflow/adapters/sql_adapter.rb', line 167

def recreate_dataset(dataset: nil)
  dataset ||= settings.write_dataset_name.to_sym
  client.drop_table?(dataset)

  unless @schema.present?
    p 'WARNING: recreate dataset aborted: no schema'
    return
  end

  create_table(dataset, @schema)
end

#save(records:) ⇒ Object

Save the given records TODO: support :replace_by parameter



145
146
147
148
149
150
151
152
153
154
# File 'lib/dataflow/adapters/sql_adapter.rb', line 145

def save(records:)
  dataset = client[settings.write_dataset_name.to_sym]
  columns = dataset.columns.reject { |x| x == SYSTEM_ID }

  tabular_data = records.map do |record|
    columns.map { |col| record[col] }
  end

  dataset.insert_ignore.import(columns, tabular_data)
end

#set_schema(schema) ⇒ Object



82
83
84
# File 'lib/dataflow/adapters/sql_adapter.rb', line 82

def set_schema(schema)
  @schema = schema
end

#update_settings(args) ⇒ Object



78
79
80
# File 'lib/dataflow/adapters/sql_adapter.rb', line 78

def update_settings(args)
  @settings = Dataflow::Adapters::Settings.new(args)
end

#usage(dataset:) ⇒ Object



209
210
211
212
213
# File 'lib/dataflow/adapters/sql_adapter.rb', line 209

def usage(dataset:)
  indexes = retrieve_collection_indexes(dataset)
  table_usage = fetch_table_usage(dataset: dataset)
  table_usage.merge(effective_indexes: indexes)
end