Class: Octopus::Proxy
- Inherits:
-
Object
- Object
- Octopus::Proxy
- Defined in:
- lib/octopus/proxy.rb
Constant Summary collapse
- CURRENT_MODEL_KEY =
'octopus.current_model'.freeze
- CURRENT_SHARD_KEY =
'octopus.current_shard'.freeze
- CURRENT_GROUP_KEY =
'octopus.current_group'.freeze
- CURRENT_SLAVE_GROUP_KEY =
'octopus.current_slave_group'.freeze
- CURRENT_LOAD_BALANCE_OPTIONS_KEY =
'octopus.current_load_balance_options'.freeze
- BLOCK_KEY =
'octopus.block'.freeze
- LAST_CURRENT_SHARD_KEY =
'octopus.last_current_shard'.freeze
- FULLY_REPLICATED_KEY =
'octopus.fully_replicated'.freeze
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#sharded ⇒ Object
Returns the value of attribute sharded.
Instance Method Summary collapse
- #block ⇒ Object
- #block=(block) ⇒ Object
- #check_schema_migrations(shard) ⇒ Object
- #clean_connection_proxy ⇒ Object
- #clear_active_connections! ⇒ Object
- #clear_all_connections! ⇒ Object
- #clear_query_cache ⇒ Object
- #connected? ⇒ Boolean
- #connection_pool ⇒ Object
- #current_group ⇒ Object
- #current_group=(group_symbol) ⇒ Object
- #current_load_balance_options ⇒ Object
- #current_load_balance_options=(options) ⇒ Object
- #current_model ⇒ Object
- #current_model=(model) ⇒ Object
- #current_shard ⇒ Object
- #current_shard=(shard_symbol) ⇒ Object
- #current_slave_group ⇒ Object
- #current_slave_group=(slave_group_symbol) ⇒ Object
- #disable_query_cache! ⇒ Object
- #enable_query_cache! ⇒ Object
- #fully_replicated? ⇒ Boolean
-
#has_group?(group) ⇒ Boolean
Public: Whether or not a group exists with the given name converted to a string.
-
#initialize(config = Octopus.config) ⇒ Proxy
constructor
A new instance of Proxy.
- #initialize_replication(config) ⇒ Object
- #initialize_shards(config) ⇒ Object
- #last_current_shard ⇒ Object
- #last_current_shard=(last_current_shard) ⇒ Object
- #method_missing(method, *args, &block) ⇒ Object
- #respond_to?(method, include_private = false) ⇒ Boolean
- #run_queries_on_shard(shard, &_block) ⇒ Object
-
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool.
- #select_connection ⇒ Object
- #send_queries_to_all_shards(&block) ⇒ Object
- #send_queries_to_group(group, &block) ⇒ Object
- #send_queries_to_multiple_shards(shards, &block) ⇒ Object
- #send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
- #send_queries_to_slave_group(method, *args, &block) ⇒ Object
- #shard_name ⇒ Object
-
#shard_names ⇒ Object
Public: Retrieves names of all loaded shards.
-
#shards_for_group(group) ⇒ Object
Public: Retrieves the defined shards for a given group.
- #should_clean_table_name? ⇒ Boolean
- #should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
- #should_send_queries_to_slave_group?(method) ⇒ Boolean
- #transaction(options = {}, &block) ⇒ Object
Constructor Details
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &block) ⇒ Object
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/octopus/proxy.rb', line 290 def method_missing(method, *args, &block) if should_clean_connection_proxy?(method) conn = select_connection self.last_current_shard = current_shard clean_connection_proxy conn.send(method, *args, &block) elsif should_send_queries_to_shard_slave_group?(method) send_queries_to_shard_slave_group(method, *args, &block) elsif should_send_queries_to_slave_group?(method) send_queries_to_slave_group(method, *args, &block) elsif should_send_queries_to_replicated_databases?(method) send_queries_to_selected_slave(method, *args, &block) else select_connection.send(method, *args, &block) end end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
7 8 9 |
# File 'lib/octopus/proxy.rb', line 7 def config @config end |
#sharded ⇒ Object
Returns the value of attribute sharded.
7 8 9 |
# File 'lib/octopus/proxy.rb', line 7 def sharded @sharded end |
Instance Method Details
#block ⇒ Object
176 177 178 |
# File 'lib/octopus/proxy.rb', line 176 def block Thread.current[BLOCK_KEY] end |
#block=(block) ⇒ Object
180 181 182 |
# File 'lib/octopus/proxy.rb', line 180 def block=(block) Thread.current[BLOCK_KEY] = block end |
#check_schema_migrations(shard) ⇒ Object
274 275 276 277 278 |
# File 'lib/octopus/proxy.rb', line 274 def check_schema_migrations(shard) OctopusModel.using(shard).connection.table_exists?( ActiveRecord::Migrator.schema_migrations_table_name, ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table end |
#clean_connection_proxy ⇒ Object
267 268 269 270 271 272 |
# File 'lib/octopus/proxy.rb', line 267 def clean_connection_proxy self.current_shard = Octopus.master_shard self.current_model = nil self.current_group = nil self.block = nil end |
#clear_active_connections! ⇒ Object
328 329 330 |
# File 'lib/octopus/proxy.rb', line 328 def clear_active_connections! with_each_healthy_shard(&:release_connection) end |
#clear_all_connections! ⇒ Object
332 333 334 |
# File 'lib/octopus/proxy.rb', line 332 def clear_all_connections! with_each_healthy_shard(&:disconnect!) end |
#clear_query_cache ⇒ Object
324 325 326 |
# File 'lib/octopus/proxy.rb', line 324 def clear_query_cache with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache } end |
#connected? ⇒ Boolean
336 337 338 |
# File 'lib/octopus/proxy.rb', line 336 def connected? @shards.any? { |_k, v| v.connected? } end |
#connection_pool ⇒ Object
311 312 313 |
# File 'lib/octopus/proxy.rb', line 311 def connection_pool @shards[current_shard] end |
#current_group ⇒ Object
146 147 148 |
# File 'lib/octopus/proxy.rb', line 146 def current_group Thread.current[CURRENT_GROUP_KEY] end |
#current_group=(group_symbol) ⇒ Object
150 151 152 153 154 155 156 157 |
# File 'lib/octopus/proxy.rb', line 150 def current_group=(group_symbol) # TODO: Error message should include all groups if given more than one bad name. [group_symbol].flatten.compact.each do |group| fail "Nonexistent Group Name: #{group}" unless has_group?(group) end Thread.current[CURRENT_GROUP_KEY] = group_symbol end |
#current_load_balance_options ⇒ Object
168 169 170 |
# File 'lib/octopus/proxy.rb', line 168 def Thread.current[CURRENT_LOAD_BALANCE_OPTIONS_KEY] end |
#current_load_balance_options=(options) ⇒ Object
172 173 174 |
# File 'lib/octopus/proxy.rb', line 172 def () Thread.current[CURRENT_LOAD_BALANCE_OPTIONS_KEY] = end |
#current_model ⇒ Object
101 102 103 |
# File 'lib/octopus/proxy.rb', line 101 def current_model Thread.current[CURRENT_MODEL_KEY] end |
#current_model=(model) ⇒ Object
105 106 107 |
# File 'lib/octopus/proxy.rb', line 105 def current_model=(model) Thread.current[CURRENT_MODEL_KEY] = model.is_a?(ActiveRecord::Base) ? model.class : model end |
#current_shard ⇒ Object
109 110 111 |
# File 'lib/octopus/proxy.rb', line 109 def current_shard Thread.current[CURRENT_SHARD_KEY] ||= Octopus.master_shard end |
#current_shard=(shard_symbol) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/octopus/proxy.rb', line 113 def current_shard=(shard_symbol) if shard_symbol.is_a?(Array) self.current_slave_group = nil shard_symbol.each { |symbol| fail "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? } elsif shard_symbol.is_a?(Hash) hash = shard_symbol shard_symbol = hash[:shard] slave_group_symbol = hash[:slave_group] = hash[:load_balance_options] if shard_symbol.nil? && slave_group_symbol.nil? fail 'Neither shard or slave group must be specified' end if shard_symbol.present? fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end if slave_group_symbol.present? if (@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) || (@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?) fail "Nonexistent Slave Group Name: #{slave_group_symbol} in shards config: #{@shards_config.inspect}" end end self.current_slave_group = slave_group_symbol self. = else fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end Thread.current[CURRENT_SHARD_KEY] = shard_symbol end |
#current_slave_group ⇒ Object
159 160 161 |
# File 'lib/octopus/proxy.rb', line 159 def current_slave_group Thread.current[CURRENT_SLAVE_GROUP_KEY] end |
#current_slave_group=(slave_group_symbol) ⇒ Object
163 164 165 166 |
# File 'lib/octopus/proxy.rb', line 163 def current_slave_group=(slave_group_symbol) Thread.current[CURRENT_SLAVE_GROUP_KEY] = slave_group_symbol Thread.current[CURRENT_LOAD_BALANCE_OPTIONS_KEY] = nil if slave_group_symbol.nil? end |
#disable_query_cache! ⇒ Object
320 321 322 |
# File 'lib/octopus/proxy.rb', line 320 def disable_query_cache! with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! } end |
#enable_query_cache! ⇒ Object
315 316 317 318 |
# File 'lib/octopus/proxy.rb', line 315 def enable_query_cache! clear_query_cache with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! } end |
#fully_replicated? ⇒ Boolean
192 193 194 |
# File 'lib/octopus/proxy.rb', line 192 def fully_replicated? @fully_replicated || Thread.current[FULLY_REPLICATED_KEY] end |
#has_group?(group) ⇒ Boolean
Public: Whether or not a group exists with the given name converted to a string.
Returns a boolean.
200 201 202 |
# File 'lib/octopus/proxy.rb', line 200 def has_group?(group) @groups.key?(group.to_s) end |
#initialize_replication(config) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/octopus/proxy.rb', line 88 def initialize_replication(config) @replicated = true if config.key?('fully_replicated') @fully_replicated = config['fully_replicated'] else @fully_replicated = true end @slaves_list = @shards.keys.map(&:to_s).sort @slaves_list.delete('master') @slaves_load_balancer = Octopus.load_balancer.new(@slaves_list) end |
#initialize_shards(config) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/octopus/proxy.rb', line 23 def initialize_shards(config) @shards = HashWithIndifferentAccess.new @shards_slave_groups = HashWithIndifferentAccess.new @slave_groups = HashWithIndifferentAccess.new @groups = {} @adapters = Set.new @config = ActiveRecord::Base.connection_pool_without_octopus.spec.config unless config.nil? @entire_sharded = config['entire_sharded'] @shards_config = config[Octopus.rails_env] end @shards_config ||= [] @shards_config.each do |key, value| if value.is_a?(String) value = resolve_string_connection(value).merge(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") elsif value.is_a?(Hash) && value.key?('adapter') value.merge!(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") slave_group_configs = value.select do |_k, v| structurally_slave_group? v end if slave_group_configs.present? slave_groups = HashWithIndifferentAccess.new slave_group_configs.each do |slave_group_name, slave_configs| slaves = HashWithIndifferentAccess.new slave_configs.each do |slave_name, slave_config| @shards[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") slaves[slave_name.to_sym] = slave_name.to_sym end slave_groups[slave_group_name.to_sym] = Octopus::SlaveGroup.new(slaves) end @shards_slave_groups[key.to_sym] = slave_groups @sharded = true end elsif value.is_a?(Hash) @groups[key.to_s] = [] value.each do |k, v| fail 'You have duplicated shard names!' if @shards.key?(k.to_sym) initialize_adapter(v['adapter']) config_with_octopus_shard = v.merge(:octopus_shard => k) @shards[k.to_sym] = connection_pool_for(config_with_octopus_shard, "#{v['adapter']}_connection") @groups[key.to_s] << k.to_sym end if structurally_slave_group? value slaves = Hash[@groups[key.to_s].map { |v| [v, v] }] @slave_groups[key.to_sym] = Octopus::SlaveGroup.new(slaves) end end end @shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus if Octopus.master_shard == :master end |
#last_current_shard ⇒ Object
184 185 186 |
# File 'lib/octopus/proxy.rb', line 184 def last_current_shard Thread.current[LAST_CURRENT_SHARD_KEY] end |
#last_current_shard=(last_current_shard) ⇒ Object
188 189 190 |
# File 'lib/octopus/proxy.rb', line 188 def last_current_shard=(last_current_shard) Thread.current[LAST_CURRENT_SHARD_KEY] = last_current_shard end |
#respond_to?(method, include_private = false) ⇒ Boolean
307 308 309 |
# File 'lib/octopus/proxy.rb', line 307 def respond_to?(method, include_private = false) super || select_connection.respond_to?(method, include_private) end |
#run_queries_on_shard(shard, &_block) ⇒ Object
243 244 245 246 247 248 249 |
# File 'lib/octopus/proxy.rb', line 243 def run_queries_on_shard(shard, &_block) keeping_connection_proxy(shard) do using_shard(shard) do yield end end end |
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool. Octopus can potentially retain a reference to a closed connection pool. Previously, that would work since the pool would just reconnect, but in Rails 3.1 the flag prevents this.
223 224 225 226 227 228 229 |
# File 'lib/octopus/proxy.rb', line 223 def safe_connection(connection_pool) connection_pool.automatic_reconnect ||= true if !connection_pool.connected? && @shards[Octopus.master_shard].connection.query_cache_enabled connection_pool.connection.enable_query_cache! end connection_pool.connection end |
#select_connection ⇒ Object
231 232 233 |
# File 'lib/octopus/proxy.rb', line 231 def select_connection safe_connection(@shards[shard_name]) end |
#send_queries_to_all_shards(&block) ⇒ Object
263 264 265 |
# File 'lib/octopus/proxy.rb', line 263 def send_queries_to_all_shards(&block) send_queries_to_multiple_shards(shard_names.uniq { |shard_name| @shards[shard_name] }, &block) end |
#send_queries_to_group(group, &block) ⇒ Object
257 258 259 260 261 |
# File 'lib/octopus/proxy.rb', line 257 def send_queries_to_group(group, &block) using_group(group) do send_queries_to_multiple_shards(shards_for_group(group), &block) end end |
#send_queries_to_multiple_shards(shards, &block) ⇒ Object
251 252 253 254 255 |
# File 'lib/octopus/proxy.rb', line 251 def send_queries_to_multiple_shards(shards, &block) shards.map do |shard| run_queries_on_shard(shard, &block) end end |
#send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
344 345 346 |
# File 'lib/octopus/proxy.rb', line 344 def send_queries_to_shard_slave_group(method, *args, &block) send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) end |
#send_queries_to_slave_group(method, *args, &block) ⇒ Object
352 353 354 |
# File 'lib/octopus/proxy.rb', line 352 def send_queries_to_slave_group(method, *args, &block) send_queries_to_balancer(@slave_groups[current_slave_group], method, *args, &block) end |
#shard_name ⇒ Object
235 236 237 |
# File 'lib/octopus/proxy.rb', line 235 def shard_name current_shard.is_a?(Array) ? current_shard.first : current_shard end |
#shard_names ⇒ Object
Public: Retrieves names of all loaded shards.
Returns an array of shard names as symbols
207 208 209 |
# File 'lib/octopus/proxy.rb', line 207 def shard_names @shards.keys end |
#shards_for_group(group) ⇒ Object
Public: Retrieves the defined shards for a given group.
Returns an array of shard names as symbols or nil if the group is not defined.
215 216 217 |
# File 'lib/octopus/proxy.rb', line 215 def shards_for_group(group) @groups.fetch(group.to_s, nil) end |
#should_clean_table_name? ⇒ Boolean
239 240 241 |
# File 'lib/octopus/proxy.rb', line 239 def should_clean_table_name? @adapters.size > 1 end |
#should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
340 341 342 |
# File 'lib/octopus/proxy.rb', line 340 def should_send_queries_to_shard_slave_group?(method) should_use_slaves_for_method?(method) && @shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present? end |
#should_send_queries_to_slave_group?(method) ⇒ Boolean
348 349 350 |
# File 'lib/octopus/proxy.rb', line 348 def should_send_queries_to_slave_group?(method) should_use_slaves_for_method?(method) && @slave_groups.try(:[], current_slave_group).present? end |
#transaction(options = {}, &block) ⇒ Object
280 281 282 283 284 285 286 287 288 |
# File 'lib/octopus/proxy.rb', line 280 def transaction( = {}, &block) if !sharded && current_model_replicated? run_queries_on_shard(Octopus.master_shard) do select_connection.transaction(, &block) end else select_connection.transaction(, &block) end end |