Class: Dataflow::Adapters::SqlAdapter
- Inherits:
-
Object
- Object
- Dataflow::Adapters::SqlAdapter
- Defined in:
- lib/dataflow/adapters/sql_adapter.rb
Overview
Interface between a data node and mongodb. We use mongodb to perform all the store/retrieve operations.
Direct Known Subclasses
Constant Summary collapse
- SYSTEM_ID =
:_id
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Class Method Summary collapse
-
.add_extensions(settings, db) ⇒ Object
load Sequel extensions based on the type.
-
.client(settings) ⇒ Sequel::Database
Get (or create) a client that satisfies the given connection settings.
-
.disconnect_clients ⇒ Object
Force the clients to disconnect their connections.
-
.try_create_db(uri, db_name) ⇒ Boolean
Used internally to try to create the DB automatically.
Instance Method Summary collapse
-
#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) ⇒ Object
retrieve all elements from a data node.
- #all_paginated(where: {}, fields: [], cursor: nil) ⇒ Object
-
#count(where: {}) ⇒ Object
count the number of records.
-
#create_indexes(dataset: nil, type: :all) ⇒ Object
Create the indexes on this dataset.
-
#delete(where: {}) ⇒ Object
Delete records that match the options.
-
#drop_dataset(dataset) ⇒ Object
drops the given dataset.
-
#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object
retrieve a single element from a data node.
-
#initialize(args) ⇒ SqlAdapter
constructor
A new instance of SqlAdapter.
-
#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object
Create queries that permit processing the whole dataset in parallel without using offsets.
-
#recreate_dataset(dataset: nil) ⇒ Object
recreate the table/collection.
-
#save(records:, replace_by: nil) ⇒ Object
Save the given records.
- #transform_to_query(opts) ⇒ Object
- #update_settings(args) ⇒ Object
- #usage(dataset:) ⇒ Object
Constructor Details
#initialize(args) ⇒ SqlAdapter
Returns a new instance of SqlAdapter.
63 64 65 66 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 63 def initialize(args) update_settings(args) @client = SqlAdapter.client(settings) end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
61 62 63 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 61 def client @client end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
60 61 62 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 60 def settings @settings end |
Class Method Details
.add_extensions(settings, db) ⇒ Object
load Sequel extensions based on the type
42 43 44 45 46 47 48 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 42 def add_extensions(settings, db) if settings.adapter_type == 'postgresql' db.extension(:pg_array) # db.extension(:pg_json) db.extension(:pg_loose_count) end end |
.client(settings) ⇒ Sequel::Database
Get (or create) a client that satisfies the given connection settings.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 12 def client(settings) @clients ||= {} connection_uri = settings.connection_uri_or_default return @clients[connection_uri] if @clients[connection_uri].present? # first, make sure the DB is created (if it is not an external db) is_external_db = settings.connection_uri.present? try_create_db(connection_uri, settings.db_name) unless is_external_db # then, create the connection object db = Sequel.connect("#{connection_uri}/#{settings.db_name}?encoding=utf8") add_extensions(settings, db) @clients[connection_uri] = db end |
.disconnect_clients ⇒ Object
Force the clients to disconnect their connections. Use before forking.
52 53 54 55 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 52 def disconnect_clients @clients ||= {} @clients.values.each(&:disconnect) end |
.try_create_db(uri, db_name) ⇒ Boolean
Used internally to try to create the DB automatically.
31 32 33 34 35 36 37 38 39 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 31 def try_create_db(uri, db_name) Sequel.connect(uri) do |db| db.run("CREATE DATABASE #{db_name}") true end rescue Sequel::DatabaseError => e # ignore error false end |
Instance Method Details
#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) ⇒ Object
retrieve all elements from a data node
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 79 def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) res = client[settings.read_dataset_name.to_sym] # if there is no fields, automatically # select all the fields expect the system _id fields = res.columns.reject { |x| x == SYSTEM_ID } if fields.blank? res = res.select(*fields.map(&:to_sym)) if fields.present? res = apply_query(res, where) (sort || {}).each do |k, v| sort_value = v == 1 ? k.to_sym : Sequel.desc(k.to_sym) res = res.order_append(sort_value) end res = res.offset(offset) if offset > 0 res = res.limit(limit) if limit > 0 if block_given? yield res else res.to_a end end |
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Object
104 105 106 107 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 104 def all_paginated(where: {}, fields: [], cursor: nil) # for now, retrieve all records at once { 'data' => all(where: where, fields: fields), 'next_cursor' => '' } end |
#count(where: {}) ⇒ Object
count the number of records
127 128 129 130 131 132 133 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 127 def count(where: {}) res = client[settings.read_dataset_name.to_sym] res = apply_query(res, where) res.count rescue Sequel::DatabaseError 0 end |
#create_indexes(dataset: nil, type: :all) ⇒ Object
Create the indexes on this dataset. TODO: add support for a :drop_retry_on_error parameter.
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 192 def create_indexes(dataset: nil, type: :all) dataset ||= settings.write_dataset_name dataset = dataset.to_sym indexes = (settings.indexes || []) case type when :unique_only indexes = indexes.select { |idx| idx['unique'] } when :non_unique_only indexes = indexes.reject { |idx| idx['unique'] } end indexes.each do |index| params = index_parameters(index) begin client.add_index(dataset, *params) rescue Sequel::DatabaseError => e # ignore index already exists next if e.wrapped_exception.is_a?(PG::DuplicateTable) # log columns not found but do not raise an error if e.wrapped_exception.is_a?(PG::UndefinedColumn) logger.error(custom_message: "add_index on #{dataset} failed.", error: e) next end # re-raise for everything else raise e end end end |
#delete(where: {}) ⇒ Object
this deletes on the read dataset
Delete records that match the options. i.e. changes are seen immediately in the case of double buffered datasets
168 169 170 171 172 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 168 def delete(where: {}) res = client[settings.read_dataset_name.to_sym] res = apply_query(res, where) res.delete end |
#drop_dataset(dataset) ⇒ Object
drops the given dataset
182 183 184 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 182 def drop_dataset(dataset) client.drop_table?(dataset) end |
#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object
retrieve a single element from a data node
74 75 76 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 74 def find(where: {}, fields: [], sort: {}, offset: 0) all(where: where, fields: fields, sort: sort, offset: offset, limit: 1).first end |
#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object
Create queries that permit processing the whole dataset in parallel without using offsets.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 110 def ordered_system_id_queries(batch_size:, where: {}) ids = all(fields: [SYSTEM_ID], where: where, sort: { SYSTEM_ID => 1 }).map { |x| x[SYSTEM_ID] } queries_count = (ids.size / batch_size.to_f).ceil Array.new(queries_count) do |i| from = ids[i * batch_size] to = ids[(i + 1) * batch_size] || ids[-1] is_last = i == queries_count - 1 where_query = { SYSTEM_ID => { '>=' => from } } operator = is_last ? '<=' : '<' where_query[SYSTEM_ID][operator] = to where_query end end |
#recreate_dataset(dataset: nil) ⇒ Object
recreate the table/collection
175 176 177 178 179 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 175 def recreate_dataset(dataset: nil) dataset ||= settings.write_dataset_name.to_sym drop_dataset(dataset) create_table(dataset, @schema, logger) end |
#save(records:, replace_by: nil) ⇒ Object
Save the given records
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 140 def save(records:, replace_by: nil) dataset_name = settings.write_dataset_name.to_sym dataset = client[dataset_name] columns = dataset.columns.reject { |x| x == SYSTEM_ID } tabular_data = records.map do |record| columns.map { |col| record[col] } end if replace_by.present? index_keys = Array(replace_by).map { |c| c.to_sym}.uniq # On conflict update every field. On Postgresql we can refer # to the "conflicting" rows using the "excluded_" prefix: update_clause = columns.map { |k| [k, Sequel.qualify('excluded', k)] }.to_h dataset .insert_conflict(target: index_keys, update: update_clause) .import(columns, tabular_data) else # ignore insert conflicts dataset.insert_ignore.import(columns, tabular_data) end end |
#transform_to_query(opts) ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 231 def transform_to_query(opts) # map to a serie of AND clauses queries opts.flat_map do |k, v| if v.is_a? Hash v.map do |operator, value| case operator when '!=' if value.is_a? Array Sequel.lit("#{k} NOT IN ?", value) else Sequel.lit("#{k} <> ?", value) end when '<', '<=', '>', '>=' Sequel.lit("#{k} #{operator} ?", value) when '~' Sequel.lit("#{k} #{regex_case_senstive_op} ?", value) when '~*' Sequel.lit("#{k} #{regex_case_insensitive_op} ?", value) end end else # e.g. simple match { 'id' => 1} or IN clauses { 'id' => [1,2] } # are supported with simples hashes [[{ k.to_sym => v }]] end end end |
#update_settings(args) ⇒ Object
68 69 70 71 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 68 def update_settings(args) @settings = Dataflow::Adapters::Settings.new(args) @schema = @settings.schema end |
#usage(dataset:) ⇒ Object
225 226 227 228 229 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 225 def usage(dataset:) indexes = retrieve_collection_indexes(dataset) table_usage = fetch_table_usage(dataset: dataset) table_usage.merge(effective_indexes: indexes) end |