Class: Dataflow::Adapters::SqlAdapter
- Inherits:
-
Object
- Object
- Dataflow::Adapters::SqlAdapter
- Defined in:
- lib/dataflow/adapters/sql_adapter.rb
Overview
Interface between a data node and mongodb. We use mongodb to perform all the store/retrieve operations.
Direct Known Subclasses
Constant Summary collapse
- SYSTEM_ID =
:_id
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Class Method Summary collapse
-
.client(settings, db_name: nil) ⇒ Sequel::Database
Get (or create) a client that satisfies the given connection settings.
-
.disconnect_clients ⇒ Object
Force the clients to disconnect their connections.
-
.try_create_db(uri, db_name, user, password) ⇒ Boolean
Used internally to try to create the DB automatically.
Instance Method Summary collapse
-
#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) ⇒ Object
retrieve all elements from a data node.
-
#count(where: {}) ⇒ Object
count the number of records.
-
#create_indexes(dataset: nil, type: :all) ⇒ Object
Create the indexes on this dataset.
-
#delete(where: {}) ⇒ Object
Delete records that match the options.
-
#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object
retrieve a single element from a data node.
-
#initialize(args) ⇒ SqlAdapter
constructor
A new instance of SqlAdapter.
-
#ordered_system_id_queries(batch_size:) ⇒ Object
Create queries that permit processing the whole dataset in parallel without using offsets.
-
#recreate_dataset(dataset: nil) ⇒ Object
recreate the table/collection.
-
#save(records:) ⇒ Object
Save the given records TODO: support :replace_by parameter.
- #set_schema(schema) ⇒ Object
- #update_settings(args) ⇒ Object
- #usage(dataset:) ⇒ Object
Constructor Details
#initialize(args) ⇒ SqlAdapter
Returns a new instance of SqlAdapter.
72 73 74 75 76 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 72 def initialize(args) update_settings(args) @client = SqlAdapter.client(settings) @schema = settings.schema || [] # TODO: detect if the table schema has a mis-match end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
70 71 72 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 70 def client @client end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
69 70 71 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 69 def settings @settings end |
Class Method Details
.client(settings, db_name: nil) ⇒ Sequel::Database
Get (or create) a client that satisfies the given connection settings.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 12 def client(settings, db_name: nil) @clients ||= {} case settings.adapter_type when 'mysql2' host = ENV['MOJACO_MYSQL_ADDRESS'] || '127.0.0.1' port = ENV['MOJACO_MYSQL_PORT'] || '3306' user = ENV['MOJACO_MYSQL_USER'] password = ENV['MOJACO_MYSQL_PASSWORD'] when 'postgresql' host = ENV['MOJACO_POSTGRESQL_ADDRESS'] || '127.0.0.1' port = ENV['MOJACO_POSTGRESQL_PORT'] || '5432' user = ENV['MOJACO_POSTGRESQL_USER'] password = ENV['MOJACO_POSTGRESQL_PASSWORD'] end db_name ||= settings.db_name user_password = user user_password += ":#{password}" if password.present? uri = "#{settings.adapter_type}://#{user_password}@#{host}:#{port}" connection_uri = settings.connection_uri || "#{uri}/#{db_name}" return @clients[connection_uri] if @clients[connection_uri].present? # first, make sure the DB is created (if it is not an external db) is_external_db = settings.connection_uri.present? try_create_db(uri, db_name, user, password) unless is_external_db # then, create the connection object @clients[connection_uri] ||= Sequel.connect("#{connection_uri}?encoding=utf8") end |
.disconnect_clients ⇒ Object
Force the clients to disconnect their connections. Use before forking.
61 62 63 64 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 61 def disconnect_clients @clients ||= {} @clients.values.each(&:disconnect) end |
.try_create_db(uri, db_name, user, password) ⇒ Boolean
Used internally to try to create the DB automatically.
49 50 51 52 53 54 55 56 57 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 49 def try_create_db(uri, db_name, user, password) Sequel.connect(uri, user: user, password: password) do |db| db.run("CREATE DATABASE #{db_name}") true end rescue Sequel::DatabaseError => e # ignore error false end |
Instance Method Details
#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) ⇒ Object
retrieve all elements from a data node
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 92 def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) res = client[settings.read_dataset_name.to_sym] # if there is no fields, automatically # select all the fields expect the system _id fields = res.columns.reject { |x| x == SYSTEM_ID } if fields.blank? res = res.select(*fields.map(&:to_sym)) if fields.present? res = apply_query(res, where) (sort || {}).each do |k, v| sort_value = v == 1 ? k.to_sym : Sequel.desc(k.to_sym) res = res.order(sort_value) end res = res.offset(offset) if offset > 0 res = res.limit(limit) if limit > 0 if block_given? yield res else res.to_a end end |
#count(where: {}) ⇒ Object
count the number of records
135 136 137 138 139 140 141 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 135 def count(where: {}) res = client[settings.read_dataset_name.to_sym] res = apply_query(res, where) res.count rescue Sequel::DatabaseError 0 end |
#create_indexes(dataset: nil, type: :all) ⇒ Object
Create the indexes on this dataset. TODO: add support for a :drop_retry_on_error parameter.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 185 def create_indexes(dataset: nil, type: :all) dataset ||= settings.write_dataset_name dataset = dataset.to_sym indexes = (settings.indexes || []) case type when :unique_only indexes = indexes.select { |idx| idx['unique'] } when :non_unique_only indexes = indexes.reject { |idx| idx['unique'] } end indexes.each do |index| params = index_parameters(index) begin client.add_index(dataset, *params) rescue Sequel::DatabaseError => e # ignore index already exists raise e unless e.wrapped_exception.is_a?(PG::DuplicateTable) end end end |
#delete(where: {}) ⇒ Object
this deletes on the read dataset
Delete records that match the options. i.e. changes are seen immediately in the case of double buffered datasets
160 161 162 163 164 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 160 def delete(where: {}) res = client[settings.read_dataset_name.to_sym] res = apply_query(res, where) res.delete end |
#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object
retrieve a single element from a data node
87 88 89 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 87 def find(where: {}, fields: [], sort: {}, offset: 0) all(where: where, fields: fields, sort: sort, offset: offset, limit: 1).first end |
#ordered_system_id_queries(batch_size:) ⇒ Object
Create queries that permit processing the whole dataset in parallel without using offsets.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 118 def ordered_system_id_queries(batch_size:) ids = all(fields: [SYSTEM_ID], sort: { SYSTEM_ID => 1 }).map { |x| x[SYSTEM_ID] } queries_count = (ids.size / batch_size.to_f).ceil Array.new(queries_count) do |i| from = ids[i * batch_size] to = ids[(i + 1) * batch_size] || ids[-1] is_last = i == queries_count - 1 where_query = { SYSTEM_ID => { '>=' => from } } operator = is_last ? '<=' : '<' where_query[SYSTEM_ID][operator] = to where_query end end |
#recreate_dataset(dataset: nil) ⇒ Object
recreate the table/collection
167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 167 def recreate_dataset(dataset: nil) dataset ||= settings.write_dataset_name.to_sym client.drop_table?(dataset) unless @schema.present? p 'WARNING: recreate dataset aborted: no schema' return end create_table(dataset, @schema) end |
#save(records:) ⇒ Object
Save the given records TODO: support :replace_by parameter
145 146 147 148 149 150 151 152 153 154 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 145 def save(records:) dataset = client[settings.write_dataset_name.to_sym] columns = dataset.columns.reject { |x| x == SYSTEM_ID } tabular_data = records.map do |record| columns.map { |col| record[col] } end dataset.insert_ignore.import(columns, tabular_data) end |
#set_schema(schema) ⇒ Object
82 83 84 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 82 def set_schema(schema) @schema = schema end |
#update_settings(args) ⇒ Object
78 79 80 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 78 def update_settings(args) @settings = Dataflow::Adapters::Settings.new(args) end |
#usage(dataset:) ⇒ Object
209 210 211 212 213 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 209 def usage(dataset:) indexes = retrieve_collection_indexes(dataset) table_usage = fetch_table_usage(dataset: dataset) table_usage.merge(effective_indexes: indexes) end |