Class: RubyMongoX
- Inherits:
-
Object
- Object
- RubyMongoX
- Includes:
- Helpers::Loggable
- Defined in:
- lib/ruby_mongo_x.rb
Class Method Summary collapse
Instance Method Summary collapse
- #count(query) ⇒ Object
- #count_data_from_shard(mongo_client, query) ⇒ Object
- #count_per_shard(query) ⇒ Object
- #disconnect ⇒ Object
- #fetch_data_from_shard(mongo_client, query) ⇒ Object
- #find(query) ⇒ Object
- #get_collection(key) ⇒ Object
-
#get_shard_index(key) ⇒ Object
log_debug “tEnsuring index: #index_definitions on #collection_name (#mongo_user)…
-
#initialize(collection_name, shards, max_count_per_shard) ⇒ RubyMongoX
constructor
A new instance of RubyMongoX.
- #update_many ⇒ Object
- #update_one ⇒ Object
Methods included from Helpers::Loggable
Constructor Details
#initialize(collection_name, shards, max_count_per_shard) ⇒ RubyMongoX
Returns a new instance of RubyMongoX.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/ruby_mongo_x.rb', line 12 def initialize(collection_name, shards, max_count_per_shard) @max_count_per_shard = max_count_per_shard @mongo_clients = {} threads = [] = {} shards.each_with_index do |shard, idx| threads << Thread.new do log_debug "\t Connecting to shard #{shard} ...\n" begin @mongo_clients[idx.to_s] = ::Mongo::Client.new(shard, ) rescue Mongo::Error::SocketError => e log_debug "\t SocketError: #{e.message}...\n" log_debug "\t SocketError: Stopped trying to connect to shard #{shard}... FAIL!\n" rescue Mongo::Error::NoSRVRecords => e log_debug "\t NoSRVRecords: #{e.message}...\n" log_debug "\t NoSRVRecords: Retrying connect to shard #{shard}...\n" sleep 1 retry rescue Mongo::Error::SocketTimeoutError => e log_debug "\t SocketTimeoutError: #{e.message}...\n" log_debug "\t SocketTimeoutError: Retrying connect to shard #{shard}...\n" sleep 1 retry end log_debug "\t Connected to shard #{shard} --> OK\n" end end threads.each(&:join) @collection_name = collection_name end |
Class Method Details
.build(collection_name, shards, max_count_per_shard) ⇒ Object
8 9 10 |
# File 'lib/ruby_mongo_x.rb', line 8 def self.build(collection_name, shards, max_count_per_shard) new(collection_name, shards, max_count_per_shard) end |
Instance Method Details
#count(query) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/ruby_mongo_x.rb', line 85 def count(query) log_debug "\t Count query: #{query.to_json}\n" promises = @mongo_clients.map do |mongo_client_item| mongo_client = mongo_client_item[1] Concurrent::Promise.execute do count_data_from_shard(mongo_client, query) end end results = promises.map { |promise| promise.value[:count] }.sort.reverse log_debug "Results sorted: #{results}\n" merged_results = results.sum() merged_results end |
#count_data_from_shard(mongo_client, query) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/ruby_mongo_x.rb', line 145 def count_data_from_shard(mongo_client, query) collection = mongo_client[@collection_name.to_sym] count = collection.count_documents(query) count = 0 if count.nil? mongo_server = mongo_client&.cluster&.servers&.first mongo_user = mongo_server&.["user"] || "" mongo_db = mongo_server&.["database"] || "" log_debug "\t\tcount() with mongo_client user: #{mongo_user}, database: #{mongo_db} returns #{count}\n" { shard: (mongo_user.nil? || mongo_user == "") ? mongo_db : mongo_user, count: count } end |
#count_per_shard(query) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/ruby_mongo_x.rb', line 99 def count_per_shard(query) log_debug "\t Count query per shard: #{query.to_json}\n" promises = @mongo_clients.map do |mongo_client_item| mongo_client = mongo_client_item[1] Concurrent::Promise.execute do count_data_from_shard(mongo_client, query) end end results = { count: 0, shards: {} } promises.map { |promise| results[:shards][promise.value[:shard]] = promise.value[:count] results[:count] = results[:count] + promise.value[:count] } log_debug "Results: #{results}\n" results end |
#disconnect ⇒ Object
160 161 162 163 164 165 166 |
# File 'lib/ruby_mongo_x.rb', line 160 def disconnect() @mongo_clients.values.each do |client| log_debug "\t Disconnecting from client #{client.to_s} ..." client.close log_debug " OK\n" end end |
#fetch_data_from_shard(mongo_client, query) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/ruby_mongo_x.rb', line 131 def fetch_data_from_shard(mongo_client, query) collection = mongo_client[@collection_name.to_sym] results = collection.find(query).to_a mongo_server = mongo_client&.cluster&.servers&.first mongo_user = mongo_server&.["user"] || "" mongo_db = mongo_server&.["database"] || "" log_debug "\t\tfind() with mongo_client user: #{mongo_user}, database: #{mongo_db} returns #{results.count} items...\n" { shard: (mongo_user.nil? || mongo_user == "") ? mongo_db : mongo_user, results: results } end |
#find(query) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/ruby_mongo_x.rb', line 72 def find(query) log_debug "\t Query: #{query.to_json}\n" promises = @mongo_clients.map do |mongo_client_item| mongo_client = mongo_client_item[1] Concurrent::Promise.execute do fetch_data_from_shard(mongo_client, query) end end results = promises.map { |promise| promise.value[:results] } merged_results = results.flatten merged_results end |
#get_collection(key) ⇒ Object
47 48 49 50 51 52 53 54 |
# File 'lib/ruby_mongo_x.rb', line 47 def get_collection(key) shard_idx = get_shard_index(key) unless @mongo_clients[shard_idx.to_s].nil? return @mongo_clients[shard_idx.to_s][@collection_name.to_sym] else raise StandardError.new("No Mongo shard for shard index: #{shard_idx} !!!") end end |
#get_shard_index(key) ⇒ Object
log_debug “tEnsuring index: #index_definitions on #collection_name (#mongo_user)… ”
mongo_client[collection_name.to_sym].indexes.create_many(index_definitions)
log_debug " OK\n"
end
end
68 69 70 |
# File 'lib/ruby_mongo_x.rb', line 68 def get_shard_index(key) ((key.to_i - 1).to_f / @max_count_per_shard).to_i end |
#update_many ⇒ Object
127 128 129 |
# File 'lib/ruby_mongo_x.rb', line 127 def update_many() # TODO: implement end |
#update_one ⇒ Object
123 124 125 |
# File 'lib/ruby_mongo_x.rb', line 123 def update_one() # TODO: implement end |