Class: Dataflow::Adapters::SqlAdapter
- Inherits:
-
Object
- Object
- Dataflow::Adapters::SqlAdapter
- Defined in:
- lib/dataflow/adapters/sql_adapter.rb
Overview
Interface between a data node and mongodb. We use mongodb to perform all the store/retrieve operations.
Direct Known Subclasses
Constant Summary collapse
- SYSTEM_ID =
:_id
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Class Method Summary collapse
-
.client(settings, db_name: nil) ⇒ Sequel::Database
Get (or create) a client that satisfies the given connection settings.
-
.disconnect_clients ⇒ Object
Force the clients to disconnect their connections.
-
.try_create_db(uri, db_name, user, password) ⇒ Boolean
Used internally to try to create the DB automatically.
Instance Method Summary collapse
-
#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) ⇒ Object
retrieve all elements from a data node.
-
#count(where: {}) ⇒ Object
count the number of records.
-
#create_indexes(dataset: nil, type: :all) ⇒ Object
Create the indexes on this dataset.
-
#delete(where: {}) ⇒ Object
Delete records that match the options.
-
#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object
retrieve a single element from a data node.
-
#initialize(args) ⇒ SqlAdapter
constructor
A new instance of SqlAdapter.
-
#ordered_system_id_queries(batch_size:) ⇒ Object
Create queries that permit processing the whole dataset in parallel without using offsets.
-
#recreate_dataset(dataset: nil) ⇒ Object
recreate the table/collection.
-
#save(records:) ⇒ Object
Save the given records TODO: support :replace_by parameter.
- #update_settings(args) ⇒ Object
- #usage(dataset:) ⇒ Object
Constructor Details
#initialize(args) ⇒ SqlAdapter
Returns a new instance of SqlAdapter.
72 73 74 75 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 72 def initialize(args) update_settings(args) @client = SqlAdapter.client(settings) end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
70 71 72 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 70 def client @client end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
69 70 71 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 69 def settings @settings end |
Class Method Details
.client(settings, db_name: nil) ⇒ Sequel::Database
Get (or create) a client that satisfies the given connection settings.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 12 def client(settings, db_name: nil) @clients ||= {} case settings.adapter_type when 'mysql2' host = ENV['MOJACO_MYSQL_ADDRESS'] || '127.0.0.1' port = ENV['MOJACO_MYSQL_PORT'] || '3306' user = ENV['MOJACO_MYSQL_USER'] password = ENV['MOJACO_MYSQL_PASSWORD'] when 'postgresql' host = ENV['MOJACO_POSTGRESQL_ADDRESS'] || '127.0.0.1' port = ENV['MOJACO_POSTGRESQL_PORT'] || '5432' user = ENV['MOJACO_POSTGRESQL_USER'] password = ENV['MOJACO_POSTGRESQL_PASSWORD'] end db_name ||= settings.db_name user_password = user user_password += ":#{password}" if password.present? uri = "#{settings.adapter_type}://#{user_password}@#{host}:#{port}" connection_uri = settings.connection_uri || "#{uri}/#{db_name}" return @clients[connection_uri] if @clients[connection_uri].present? # first, make sure the DB is created (if it is not an external db) is_external_db = settings.connection_uri.present? try_create_db(uri, db_name, user, password) unless is_external_db # then, create the connection object @clients[connection_uri] ||= Sequel.connect("#{connection_uri}?encoding=utf8") end |
.disconnect_clients ⇒ Object
Force the clients to disconnect their connections. Use before forking.
61 62 63 64 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 61 def disconnect_clients @clients ||= {} @clients.values.each(&:disconnect) end |
.try_create_db(uri, db_name, user, password) ⇒ Boolean
Used internally to try to create the DB automatically.
49 50 51 52 53 54 55 56 57 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 49 def try_create_db(uri, db_name, user, password) Sequel.connect(uri, user: user, password: password) do |db| db.run("CREATE DATABASE #{db_name}") true end rescue Sequel::DatabaseError => e # ignore error false end |
Instance Method Details
#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) ⇒ Object
retrieve all elements from a data node
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 88 def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) res = client[settings.read_dataset_name.to_sym] # if there is no fields, automatically # select all the fields expect the system _id fields = res.columns.reject { |x| x == SYSTEM_ID } if fields.blank? res = res.select(*fields.map(&:to_sym)) if fields.present? res = apply_query(res, where) (sort || {}).each do |k, v| sort_value = v == 1 ? k.to_sym : Sequel.desc(k.to_sym) res = res.order(sort_value) end res = res.offset(offset) if offset > 0 res = res.limit(limit) if limit > 0 if block_given? yield res else res.to_a end end |
#count(where: {}) ⇒ Object
count the number of records
131 132 133 134 135 136 137 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 131 def count(where: {}) res = client[settings.read_dataset_name.to_sym] res = apply_query(res, where) res.count rescue Sequel::DatabaseError 0 end |
#create_indexes(dataset: nil, type: :all) ⇒ Object
Create the indexes on this dataset. TODO: add support for a :drop_retry_on_error parameter.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 175 def create_indexes(dataset: nil, type: :all) dataset ||= settings.write_dataset_name dataset = dataset.to_sym indexes = (settings.indexes || []) case type when :unique_only indexes = indexes.select { |idx| idx['unique'] } when :non_unique_only indexes = indexes.reject { |idx| idx['unique'] } end indexes.each do |index| params = index_parameters(index) begin client.add_index(dataset, *params) rescue Sequel::DatabaseError => e # ignore index already exists next if e.wrapped_exception.is_a?(PG::DuplicateTable) # log columns not found but do not raise an error if e.wrapped_exception.is_a?(PG::UndefinedColumn) logger.log("[Error] add_index on #{dataset} failed. #{e}") next end # re-raise for everything else raise e end end end |
#delete(where: {}) ⇒ Object
this deletes on the read dataset
Delete records that match the options. i.e. changes are seen immediately in the case of double buffered datasets
156 157 158 159 160 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 156 def delete(where: {}) res = client[settings.read_dataset_name.to_sym] res = apply_query(res, where) res.delete end |
#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object
retrieve a single element from a data node
83 84 85 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 83 def find(where: {}, fields: [], sort: {}, offset: 0) all(where: where, fields: fields, sort: sort, offset: offset, limit: 1).first end |
#ordered_system_id_queries(batch_size:) ⇒ Object
Create queries that permit processing the whole dataset in parallel without using offsets.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 114 def ordered_system_id_queries(batch_size:) ids = all(fields: [SYSTEM_ID], sort: { SYSTEM_ID => 1 }).map { |x| x[SYSTEM_ID] } queries_count = (ids.size / batch_size.to_f).ceil Array.new(queries_count) do |i| from = ids[i * batch_size] to = ids[(i + 1) * batch_size] || ids[-1] is_last = i == queries_count - 1 where_query = { SYSTEM_ID => { '>=' => from } } operator = is_last ? '<=' : '<' where_query[SYSTEM_ID][operator] = to where_query end end |
#recreate_dataset(dataset: nil) ⇒ Object
recreate the table/collection
163 164 165 166 167 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 163 def recreate_dataset(dataset: nil) dataset ||= settings.write_dataset_name.to_sym client.drop_table?(dataset) create_table(dataset, @schema) end |
#save(records:) ⇒ Object
Save the given records TODO: support :replace_by parameter
141 142 143 144 145 146 147 148 149 150 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 141 def save(records:) dataset = client[settings.write_dataset_name.to_sym] columns = dataset.columns.reject { |x| x == SYSTEM_ID } tabular_data = records.map do |record| columns.map { |col| record[col] } end dataset.insert_ignore.import(columns, tabular_data) end |
#update_settings(args) ⇒ Object
77 78 79 80 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 77 def update_settings(args) @settings = Dataflow::Adapters::Settings.new(args) @schema = @settings.schema end |
#usage(dataset:) ⇒ Object
208 209 210 211 212 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 208 def usage(dataset:) indexes = retrieve_collection_indexes(dataset) table_usage = fetch_table_usage(dataset: dataset) table_usage.merge(effective_indexes: indexes) end |