Class: Dataflow::Adapters::MongoDbAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/dataflow/adapters/mongo_db_adapter.rb

Overview

Interface between a data node and mongodb. We use mongodb to perform all the store/retrieve operations.

Constant Summary collapse

SYSTEM_ID =
'_id'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ MongoDbAdapter

Returns a new instance of MongoDbAdapter.



35
36
37
38
39
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 35

def initialize(args)
  update_settings(args)
  @client = MongoDbAdapter.client(settings)
  @admin_client = MongoDbAdapter.admin_client(settings)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



33
34
35
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 33

def client
  @client
end

#settingsObject (readonly)

Returns the value of attribute settings.



32
33
34
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 32

def settings
  @settings
end

Class Method Details

.admin_client(settings) ⇒ Object



19
20
21
22
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 19

def admin_client(settings)
  return @admin_client if @admin_client
  @admin_client = client(settings, db_name: 'admin')
end

.client(settings, db_name: nil) ⇒ Object



10
11
12
13
14
15
16
17
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 10

def client(settings, db_name: nil)
  @clients ||= {}
  host = ENV['MOJACO_MONGO_ADDRESS'] || '127.0.0.1'
  port = '27017'
  connection_uri = settings.connection_uri || "#{host}:#{port}"
  db_name ||= settings.db_name
  @clients["#{connection_uri}.#{db_name}"] ||= Mongo::Client.new([connection_uri], database: db_name)
end

.disconnect_clientsObject

Force the clients to disconnect their connections. Use before forking.



26
27
28
29
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 26

def disconnect_clients
  @clients ||= {}
  @clients.values.each(&:close)
end

Instance Method Details

#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0) ⇒ Object

retrieve all elements from a data node



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 51

def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0)
  projection = fields.map { |field| [field, 1] }

  unless fields.map(&:to_s).include?(SYSTEM_ID)
    # by default, do not select the _id field
    projection << [SYSTEM_ID, 0].freeze
  end

  opts = transform_to_query(where)
  res = client[read_dataset_name].find(opts)
  res = res.projection(projection.to_h)

  res = res.sort(sort)   if sort
  res = res.skip(offset) if offset > 0
  res = res.limit(limit) if limit > 0

  if block_given?
    yield res
  else
    res.to_a
  end
end

#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash

Helper that supports paginating through the whole dataset at fixed performance. Unlike using offset/skip which requires to read through the skipped content (high usage of CPU), we use the internal mongo cursor to get batch of results.

Returns:

  • (Hash)

    with 2 fields: data and next_cursor for the next call



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 79

def all_paginated(where: {}, fields: [], cursor: nil)
  cursor = cursor.to_i
  data = []

  # If there is no cursor, we make the initial query
  # get the first batch of data and get the cursor id.
  if cursor.zero?
    all(where: where, fields: fields) do |res|
      results = res.initial_query
      data = results.documents
      cursor = res.cursor.id
    end
  end

  # The first query's result batch is a small 101 set of results
  # so we want to get one more batch of data.
  # However, there might be queries whose results are very small
  # and the resulting cursor is 0. In such case there is no more
  # data to be fetched.
  unless cursor.zero?
    # send a getMore command on the cursor id
    command = { getMore: cursor, collection: read_dataset_name }
    result = client.database.command(command).documents[0]
    cursor = result['cursor']['id']
    data += result['cursor']['nextBatch']
  end

  # We want to return the cursor as a string.
  # If there is no cursor (zero) then make it empty
  cursor = '' if cursor.zero?

  { 'data' => data, 'next_cursor' => cursor.to_s }
rescue Mongo::Error::OperationFailure
  { 'data' => data, 'next_cursor' => '' }
end

#count(where: {}) ⇒ Object

count the number of records



133
134
135
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 133

def count(where: {})
  client[read_dataset_name].count(transform_to_query(where))
end

#create_indexes(dataset: nil, type: :all, drop_retry_on_error: true) ⇒ Object

Create the indexes on this dataset.

Parameters:

  • dataset (String) (defaults to: nil)

    Specify on which dataset the operation will be performed. Default: the adatpter’s settings’ dataset.

  • type (Symbol) (defaults to: :all)

    select which indexes type to create. Can be :all (default), :unique_only, :non_unique_only



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 179

def create_indexes(dataset: nil, type: :all, drop_retry_on_error: true)
  dataset ||= write_dataset_name
  return unless settings.indexes.present?

  indexes = (settings.indexes || [])

  case type
  when :unique_only
    indexes = indexes.select { |idx| idx['unique'] }
  when :non_unique_only
    indexes = indexes.reject { |idx| idx['unique'] }
  end

  indexes = indexes.map { |x| format_index(x) }
  client[dataset].indexes.create_many(indexes)
rescue Mongo::Error::OperationFailure => e
  raise e unless drop_retry_on_error
  client[dataset].indexes.drop_all
  create_indexes(drop_retry_on_error: false)
end

#delete(where: {}) ⇒ Object

Delete records that match the options.

Parameters:

  • where (defaults to: {})

    query to apply on the delete operation.



162
163
164
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 162

def delete(where: {})
  client[read_dataset_name].delete_many(transform_to_query(where))
end

#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object

retrieve a single element from a data node



46
47
48
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 46

def find(where: {}, fields: [], sort: {}, offset: 0)
  all(where: where, fields: fields, sort: sort, offset: offset, limit: 1).first
end

#ordered_system_id_queries(batch_size:) ⇒ Object

Create queries that permit processing the whole dataset in parallel without using offsets.



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 116

def ordered_system_id_queries(batch_size:)
  ids = all(fields: [SYSTEM_ID], sort: { SYSTEM_ID => 1 }).map { |x| x[SYSTEM_ID].to_s }
  queries_count = (ids.size / batch_size.to_f).ceil
  Array.new(queries_count) do |i|
    from = ids[i * batch_size]
    to = ids[(i + 1) * batch_size] || ids[-1]
    is_last = i == queries_count - 1

    where_query = { SYSTEM_ID => { '>=' => from } }
    operator = is_last ? '<=' : '<'
    where_query[SYSTEM_ID][operator] = to

    where_query
  end
end

#recreate_dataset(dataset: nil) ⇒ Object

recreate the table/collection



167
168
169
170
171
172
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 167

def recreate_dataset(dataset: nil)
  dataset ||= write_dataset_name
  collection = client[dataset]
  collection.drop
  collection.create
end

#save(records:, replace_by: nil) ⇒ Object

Save the given records.

Parameters:

  • replace_by (Array) (defaults to: nil)

    if the replace_by key is provided, it will try to replace records with the matching key, or insert if none is found.



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 141

def save(records:, replace_by: nil)
  if replace_by.present?
    replace_keys = Array(replace_by)
    bulk_ops = records.map do |record|
      filter = replace_keys.map { |x| [x, record[x]] }.to_h
      {
        replace_one: {
          filter: filter,
          replacement: record,
          upsert: true
        }
      }
    end
    client[write_dataset_name].bulk_write(bulk_ops, ordered: false)
  else
    save_many(records: records)
  end
end

#update_settings(args) ⇒ Object



41
42
43
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 41

def update_settings(args)
  @settings = Dataflow::Adapters::Settings.new(args)
end

#usage(dataset:) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 200

def usage(dataset:)
  indexes = retrieve_collection_indexes(dataset)
  command = { collstats: dataset }
  result = client.database.command(command).documents[0]
  {
    memory: result['size'],
    storage: result['storageSize'],
    effective_indexes: indexes
  }
rescue Mongo::Error::OperationFailure, Mongo::Error::InvalidCollectionName
  {
    memory: 0,
    storage: 0,
    effective_indexes: indexes
  }
end