Class: RubyMongoX

Inherits:
Object
  • Object
show all
Includes:
Helpers::Loggable
Defined in:
lib/ruby_mongo_x.rb

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers::Loggable

#log_debug

Constructor Details

#initialize(collection_name, shards, max_count_per_shard) ⇒ RubyMongoX

Returns a new instance of RubyMongoX.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/ruby_mongo_x.rb', line 12

def initialize(collection_name, shards, max_count_per_shard)
  @max_count_per_shard = max_count_per_shard
  @mongo_clients = {}

  threads = []

  options = {}

  shards.each_with_index do |shard, idx|
    threads << Thread.new do
      log_debug "\t Connecting to shard #{shard} ...\n"
      begin
        @mongo_clients[idx.to_s] = ::Mongo::Client.new(shard, options)
      rescue Mongo::Error::SocketError => e
        log_debug "\t SocketError: #{e.message}...\n"
        log_debug "\t SocketError: Stopped trying to connect to shard #{shard}... FAIL!\n"
      rescue Mongo::Error::NoSRVRecords => e
        log_debug "\t NoSRVRecords: #{e.message}...\n"
        log_debug "\t NoSRVRecords: Retrying connect to shard #{shard}...\n"
        sleep 1
        retry
      rescue Mongo::Error::SocketTimeoutError => e
        log_debug "\t SocketTimeoutError: #{e.message}...\n"
        log_debug "\t SocketTimeoutError: Retrying connect to shard #{shard}...\n"
        sleep 1
        retry
      end
      log_debug "\t Connected to shard #{shard} --> OK\n"
    end
  end

  threads.each(&:join)
  @collection_name = collection_name
end

Class Method Details

.build(collection_name, shards, max_count_per_shard) ⇒ Object



8
9
10
# File 'lib/ruby_mongo_x.rb', line 8

def self.build(collection_name, shards, max_count_per_shard)
  new(collection_name, shards, max_count_per_shard)
end

Instance Method Details

#count(query) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/ruby_mongo_x.rb', line 85

def count(query)
  log_debug "\t Count query: #{query.to_json}\n"
  promises = @mongo_clients.map do |mongo_client_item|
    mongo_client = mongo_client_item[1]
    Concurrent::Promise.execute do
      count_data_from_shard(mongo_client, query)
    end
  end
  results = promises.map { |promise| promise.value[:count] }.sort.reverse
  log_debug "Results sorted: #{results}\n"
  merged_results = results.sum()
  merged_results
end

#count_data_from_shard(mongo_client, query) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/ruby_mongo_x.rb', line 145

def count_data_from_shard(mongo_client, query)
  collection = mongo_client[@collection_name.to_sym]
  count = collection.count_documents(query)
  count = 0 if count.nil?
  mongo_server = mongo_client&.cluster&.servers&.first
  mongo_user = mongo_server&.options["user"] || ""
  mongo_db = mongo_server&.options["database"] || ""
  log_debug "\t\tcount() with mongo_client user: #{mongo_user}, database: #{mongo_db} returns #{count}\n"

  {
    shard: (mongo_user.nil? || mongo_user == "") ? mongo_db : mongo_user,
    count: count
  }
end

#count_per_shard(query) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/ruby_mongo_x.rb', line 99

def count_per_shard(query)
  log_debug "\t Count query per shard: #{query.to_json}\n"
  promises = @mongo_clients.map do |mongo_client_item|
    mongo_client = mongo_client_item[1]
    Concurrent::Promise.execute do
      count_data_from_shard(mongo_client, query)
    end
  end

  results = {
    count: 0,
    shards: {}
  }

  promises.map { |promise|
    results[:shards][promise.value[:shard]] = promise.value[:count]
    results[:count] = results[:count] + promise.value[:count]
  }

  log_debug "Results: #{results}\n"
  results
end

#disconnectObject



160
161
162
163
164
165
166
# File 'lib/ruby_mongo_x.rb', line 160

def disconnect()
  @mongo_clients.values.each do |client|
    log_debug "\t Disconnecting from client #{client.to_s} ..."
    client.close
    log_debug " OK\n"
  end
end

#fetch_data_from_shard(mongo_client, query) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/ruby_mongo_x.rb', line 131

def fetch_data_from_shard(mongo_client, query)
  collection = mongo_client[@collection_name.to_sym]
  results = collection.find(query).to_a
  mongo_server = mongo_client&.cluster&.servers&.first
  mongo_user = mongo_server&.options["user"] || ""
  mongo_db = mongo_server&.options["database"] || ""
  log_debug "\t\tfind() with mongo_client user: #{mongo_user}, database: #{mongo_db} returns #{results.count} items...\n"

  {
    shard: (mongo_user.nil? || mongo_user == "") ? mongo_db : mongo_user,
    results: results
  }
end

#find(query) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/ruby_mongo_x.rb', line 72

def find(query)
  log_debug "\t Query: #{query.to_json}\n"
  promises = @mongo_clients.map do |mongo_client_item|
    mongo_client = mongo_client_item[1]
    Concurrent::Promise.execute do
      fetch_data_from_shard(mongo_client, query)
    end
  end
  results = promises.map { |promise| promise.value[:results] }
  merged_results = results.flatten
  merged_results
end

#get_collection(key) ⇒ Object



47
48
49
50
51
52
53
54
# File 'lib/ruby_mongo_x.rb', line 47

def get_collection(key)
  shard_idx = get_shard_index(key)
  unless @mongo_clients[shard_idx.to_s].nil?
    return @mongo_clients[shard_idx.to_s][@collection_name.to_sym]
  else
    raise StandardError.new("No Mongo shard for shard index: #{shard_idx} !!!")
  end
end

#get_shard_index(key) ⇒ Object

log_debug “tEnsuring index: #index_definitions on #collection_name (#mongo_user)… ”

  mongo_client[collection_name.to_sym].indexes.create_many(index_definitions)
  log_debug " OK\n"
end

end



68
69
70
# File 'lib/ruby_mongo_x.rb', line 68

def get_shard_index(key)
  ((key.to_i - 1).to_f / @max_count_per_shard).to_i
end

#update_manyObject



127
128
129
# File 'lib/ruby_mongo_x.rb', line 127

def update_many()
  # TODO: implement
end

#update_oneObject



123
124
125
# File 'lib/ruby_mongo_x.rb', line 123

def update_one()
  # TODO: implement
end