Class: RubyMongoX

Inherits:
Object
  • Object
show all
Includes:
Helpers::Loggable
Defined in:
lib/ruby_mongo_x.rb

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Loggable

#log_debug

Constructor Details

#initialize(collection_name, shards, max_count_per_shard) ⇒ RubyMongoX

Returns a new instance of RubyMongoX.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/ruby_mongo_x.rb', line 12

def initialize(collection_name, shards, max_count_per_shard)
  @max_count_per_shard = max_count_per_shard
  @mongo_clients = {}

  threads = []

  options = {}

  shards.each_with_index do |shard, idx|
    threads << Thread.new do
      log_debug "\t Connecting to shard #{shard} ...\n"
      begin
        @mongo_clients[idx.to_s] = ::Mongo::Client.new(shard, options)
      rescue Mongo::Error::NoSRVRecords => e
        log_debug "\t NoSRVRecords: #{e.message}...\n"
        log_debug "\t NoSRVRecords: Retrying connect to shard #{shard}...\n"
        sleep 1
        retry
      rescue Mongo::Error::SocketTimeoutError => e
        log_debug "\t SocketTimeoutError: #{e.message}...\n"
        log_debug "\t SocketTimeoutError: Retrying connect to shard #{shard}...\n"
        sleep 1
        retry
      end
      log_debug "\t Connected to shard #{shard} --> OK\n"
    end
  end

  threads.each(&:join)
  @collection_name = collection_name
end

Class Method Details

.build(collection_name, shards, max_count_per_shard) ⇒ Object



8
9
10
# File 'lib/ruby_mongo_x.rb', line 8

def self.build(collection_name, shards, max_count_per_shard)
  new(collection_name, shards, max_count_per_shard)
end

Instance Method Details

#count(query) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/ruby_mongo_x.rb', line 82

def count(query)
  log_debug "\t Count query: #{query.to_json}\n"
  promises = @mongo_clients.map do |mongo_client_item|
    mongo_client = mongo_client_item[1]
    Concurrent::Promise.execute do
      count_data_from_shard(mongo_client, query)
    end
  end
  results = promises.map { |promise| promise.value[:count] }.sort.reverse
  log_debug "Results sorted: #{results}\n"
  merged_results = results.sum()
  merged_results
end

#count_data_from_shard(mongo_client, query) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/ruby_mongo_x.rb', line 142

def count_data_from_shard(mongo_client, query)
  collection = mongo_client[@collection_name.to_sym]
  count = collection.count_documents(query)
  count = 0 if count.nil?
  mongo_server = mongo_client&.cluster&.servers&.first
  mongo_user = mongo_server&.options["user"] || ""
  mongo_db = mongo_server&.options["database"] || ""
  log_debug "\t\tcount() with mongo_client user: #{mongo_user}, database: #{mongo_db} returns #{count}\n"

  {
    shard: (mongo_user.nil? || mongo_user == "") ? mongo_db : mongo_user,
    count: count
  }
end

#count_per_shard(query) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/ruby_mongo_x.rb', line 96

def count_per_shard(query)
  log_debug "\t Count query per shard: #{query.to_json}\n"
  promises = @mongo_clients.map do |mongo_client_item|
    mongo_client = mongo_client_item[1]
    Concurrent::Promise.execute do
      count_data_from_shard(mongo_client, query)
    end
  end

  results = {
    count: 0,
    shards: {}
  }

  promises.map { |promise|
    results[:shards][promise.value[:shard]] = promise.value[:count]
    results[:count] = results[:count] + promise.value[:count]
  }

  log_debug "Results: #{results}\n"
  results
end

#disconnectObject



157
158
159
160
161
162
163
# File 'lib/ruby_mongo_x.rb', line 157

def disconnect()
  @mongo_clients.values.each do |client|
    log_debug "\t Disconnecting from client #{client.to_s} ..."
    client.close
    log_debug " OK\n"
  end
end

#fetch_data_from_shard(mongo_client, query) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/ruby_mongo_x.rb', line 128

def fetch_data_from_shard(mongo_client, query)
  collection = mongo_client[@collection_name.to_sym]
  results = collection.find(query).to_a
  mongo_server = mongo_client&.cluster&.servers&.first
  mongo_user = mongo_server&.options["user"] || ""
  mongo_db = mongo_server&.options["database"] || ""
  log_debug "\t\tfind() with mongo_client user: #{mongo_user}, database: #{mongo_db} returns #{results.count} items...\n"

  {
    shard: (mongo_user.nil? || mongo_user == "") ? mongo_db : mongo_user,
    results: results
  }
end

#find(query) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/ruby_mongo_x.rb', line 69

def find(query)
  log_debug "\t Query: #{query.to_json}\n"
  promises = @mongo_clients.map do |mongo_client_item|
    mongo_client = mongo_client_item[1]
    Concurrent::Promise.execute do
      fetch_data_from_shard(mongo_client, query)
    end
  end
  results = promises.map { |promise| promise.value[:results] }
  merged_results = results.flatten
  merged_results
end

#get_collection(key) ⇒ Object



44
45
46
47
48
49
50
51
# File 'lib/ruby_mongo_x.rb', line 44

def get_collection(key)
  shard_idx = get_shard_index(key)
  unless @mongo_clients[shard_idx.to_s].nil?
    return @mongo_clients[shard_idx.to_s][@collection_name.to_sym]
  else
    raise StandardError.new("No Mongo shard for shard index: #{shard_idx} !!!")
  end
end

#get_shard_index(key) ⇒ Object

log_debug “tEnsuring index: #index_definitions on #collection_name (#mongo_user)… ”

  mongo_client[collection_name.to_sym].indexes.create_many(index_definitions)
  log_debug " OK\n"
end

end



65
66
67
# File 'lib/ruby_mongo_x.rb', line 65

def get_shard_index(key)
  ((key.to_i - 1).to_f / @max_count_per_shard).to_i
end

#update_manyObject



124
125
126
# File 'lib/ruby_mongo_x.rb', line 124

def update_many()
  # TODO: implement
end

#update_oneObject



120
121
122
# File 'lib/ruby_mongo_x.rb', line 120

def update_one()
  # TODO: implement
end