Class: Octopus::Proxy
- Inherits:
-
Object
- Object
- Octopus::Proxy
- Defined in:
- lib/octopus/proxy.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#sharded ⇒ Object
Returns the value of attribute sharded.
Instance Method Summary collapse
- #block ⇒ Object
- #block=(block) ⇒ Object
- #check_schema_migrations(shard) ⇒ Object
- #clean_connection_proxy ⇒ Object
- #clear_active_connections! ⇒ Object
- #clear_all_connections! ⇒ Object
- #clear_query_cache ⇒ Object
- #connected? ⇒ Boolean
- #connection_pool ⇒ Object
- #current_group ⇒ Object
- #current_group=(group_symbol) ⇒ Object
- #current_model ⇒ Object
- #current_model=(model) ⇒ Object
- #current_shard ⇒ Object
- #current_shard=(shard_symbol) ⇒ Object
- #current_slave_group ⇒ Object
- #current_slave_group=(slave_group_symbol) ⇒ Object
- #disable_query_cache! ⇒ Object
- #enable_query_cache! ⇒ Object
- #fully_replicated? ⇒ Boolean
-
#has_group?(group) ⇒ Boolean
Public: Whether or not a group exists with the given name converted to a string.
-
#initialize(config = Octopus.config) ⇒ Proxy
constructor
A new instance of Proxy.
- #initialize_replication(config) ⇒ Object
- #initialize_shards(config) ⇒ Object
- #last_current_shard ⇒ Object
- #last_current_shard=(last_current_shard) ⇒ Object
- #method_missing(method, *args, &block) ⇒ Object
- #respond_to?(method, include_private = false) ⇒ Boolean
- #run_queries_on_shard(shard, &_block) ⇒ Object
-
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool.
- #select_connection ⇒ Object
- #send_queries_to_multiple_shards(shards, &block) ⇒ Object
- #send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
- #send_queries_to_slave_group(method, *args, &block) ⇒ Object
- #shard_name ⇒ Object
-
#shard_names ⇒ Object
Public: Retrieves names of all loaded shards.
-
#shards_for_group(group) ⇒ Object
Public: Retrieves the defined shards for a given group.
- #should_clean_table_name? ⇒ Boolean
- #should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
- #should_send_queries_to_slave_group?(method) ⇒ Boolean
- #transaction(options = {}, &block) ⇒ Object
Constructor Details
#initialize(config = Octopus.config) ⇒ Proxy
Returns a new instance of Proxy.
9 10 11 12 |
# File 'lib/octopus/proxy.rb', line 9 def initialize(config = Octopus.config) initialize_shards(config) initialize_replication(config) if !config.nil? && config['replicated'] end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &block) ⇒ Object
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/octopus/proxy.rb', line 260 def method_missing(method, *args, &block) if should_clean_connection_proxy?(method) conn = select_connection self.last_current_shard = current_shard clean_connection_proxy conn.send(method, *args, &block) elsif should_send_queries_to_shard_slave_group?(method) send_queries_to_shard_slave_group(method, *args, &block) elsif should_send_queries_to_slave_group?(method) send_queries_to_slave_group(method, *args, &block) elsif should_send_queries_to_replicated_databases?(method) send_queries_to_selected_slave(method, *args, &block) else select_connection.send(method, *args, &block) end end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
7 8 9 |
# File 'lib/octopus/proxy.rb', line 7 def config @config end |
#sharded ⇒ Object
Returns the value of attribute sharded.
7 8 9 |
# File 'lib/octopus/proxy.rb', line 7 def sharded @sharded end |
Instance Method Details
#block ⇒ Object
156 157 158 |
# File 'lib/octopus/proxy.rb', line 156 def block Thread.current['octopus.block'] end |
#block=(block) ⇒ Object
160 161 162 |
# File 'lib/octopus/proxy.rb', line 160 def block=(block) Thread.current['octopus.block'] = block end |
#check_schema_migrations(shard) ⇒ Object
244 245 246 247 248 |
# File 'lib/octopus/proxy.rb', line 244 def check_schema_migrations(shard) OctopusModel.using(shard).connection.table_exists?( ActiveRecord::Migrator.schema_migrations_table_name, ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table end |
#clean_connection_proxy ⇒ Object
237 238 239 240 241 242 |
# File 'lib/octopus/proxy.rb', line 237 def clean_connection_proxy self.current_shard = :master self.current_model = nil self.current_group = nil self.block = nil end |
#clear_active_connections! ⇒ Object
298 299 300 |
# File 'lib/octopus/proxy.rb', line 298 def clear_active_connections! with_each_healthy_shard(&:release_connection) end |
#clear_all_connections! ⇒ Object
302 303 304 |
# File 'lib/octopus/proxy.rb', line 302 def clear_all_connections! with_each_healthy_shard(&:disconnect!) end |
#clear_query_cache ⇒ Object
294 295 296 |
# File 'lib/octopus/proxy.rb', line 294 def clear_query_cache with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache } end |
#connected? ⇒ Boolean
306 307 308 |
# File 'lib/octopus/proxy.rb', line 306 def connected? @shards.any? { |_k, v| v.connected? } end |
#connection_pool ⇒ Object
281 282 283 |
# File 'lib/octopus/proxy.rb', line 281 def connection_pool @shards[current_shard] end |
#current_group ⇒ Object
135 136 137 |
# File 'lib/octopus/proxy.rb', line 135 def current_group Thread.current['octopus.current_group'] end |
#current_group=(group_symbol) ⇒ Object
139 140 141 142 143 144 145 146 |
# File 'lib/octopus/proxy.rb', line 139 def current_group=(group_symbol) # TODO: Error message should include all groups if given more than one bad name. [group_symbol].flatten.compact.each do |group| fail "Nonexistent Group Name: #{group}" unless has_group?(group) end Thread.current['octopus.current_group'] = group_symbol end |
#current_model ⇒ Object
92 93 94 |
# File 'lib/octopus/proxy.rb', line 92 def current_model Thread.current['octopus.current_model'] end |
#current_model=(model) ⇒ Object
96 97 98 |
# File 'lib/octopus/proxy.rb', line 96 def current_model=(model) Thread.current['octopus.current_model'] = model.is_a?(ActiveRecord::Base) ? model.class : model end |
#current_shard ⇒ Object
100 101 102 |
# File 'lib/octopus/proxy.rb', line 100 def current_shard Thread.current['octopus.current_shard'] ||= :master end |
#current_shard=(shard_symbol) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/octopus/proxy.rb', line 104 def current_shard=(shard_symbol) self.current_slave_group = nil if shard_symbol.is_a?(Array) shard_symbol.each { |symbol| fail "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? } elsif shard_symbol.is_a?(Hash) hash = shard_symbol shard_symbol = hash[:shard] slave_group_symbol = hash[:slave_group] if shard_symbol.nil? && slave_group_symbol.nil? fail 'Neither shard or slave group must be specified' end if shard_symbol.present? fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end if slave_group_symbol.present? if (@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) || (@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?) fail "Nonexistent Slave Group Name: #{slave_group_symbol} in shards config: #{@shards_config.inspect}" end self.current_slave_group = slave_group_symbol end else fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end Thread.current['octopus.current_shard'] = shard_symbol end |
#current_slave_group ⇒ Object
148 149 150 |
# File 'lib/octopus/proxy.rb', line 148 def current_slave_group Thread.current['octopus.current_slave_group'] end |
#current_slave_group=(slave_group_symbol) ⇒ Object
152 153 154 |
# File 'lib/octopus/proxy.rb', line 152 def current_slave_group=(slave_group_symbol) Thread.current['octopus.current_slave_group'] = slave_group_symbol end |
#disable_query_cache! ⇒ Object
290 291 292 |
# File 'lib/octopus/proxy.rb', line 290 def disable_query_cache! with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! } end |
#enable_query_cache! ⇒ Object
285 286 287 288 |
# File 'lib/octopus/proxy.rb', line 285 def enable_query_cache! clear_query_cache with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! } end |
#fully_replicated? ⇒ Boolean
172 173 174 |
# File 'lib/octopus/proxy.rb', line 172 def fully_replicated? @fully_replicated || Thread.current['octopus.fully_replicated'] end |
#has_group?(group) ⇒ Boolean
Public: Whether or not a group exists with the given name converted to a string.
Returns a boolean.
180 181 182 |
# File 'lib/octopus/proxy.rb', line 180 def has_group?(group) @groups.key?(group.to_s) end |
#initialize_replication(config) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/octopus/proxy.rb', line 79 def initialize_replication(config) @replicated = true if config.key?('fully_replicated') @fully_replicated = config['fully_replicated'] else @fully_replicated = true end @slaves_list = @shards.keys.map(&:to_s).sort @slaves_list.delete('master') @slaves_load_balancer = Octopus::LoadBalancing::RoundRobin.new(@slaves_list) end |
#initialize_shards(config) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/octopus/proxy.rb', line 14 def initialize_shards(config) @shards = HashWithIndifferentAccess.new @shards_slave_groups = HashWithIndifferentAccess.new @slave_groups = HashWithIndifferentAccess.new @groups = {} @adapters = Set.new @config = ActiveRecord::Base.connection_pool_without_octopus.spec.config unless config.nil? @entire_sharded = config['entire_sharded'] @shards_config = config[Octopus.rails_env] end @shards_config ||= [] @shards_config.each do |key, value| if value.is_a?(String) value = resolve_string_connection(value).merge(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") elsif value.is_a?(Hash) && value.key?('adapter') value.merge!(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") slave_group_configs = value.select do |_k, v| structurally_slave_group? v end if slave_group_configs.present? slave_groups = HashWithIndifferentAccess.new slave_group_configs.each do |slave_group_name, slave_configs| slaves = HashWithIndifferentAccess.new slave_configs.each do |slave_name, slave_config| @shards[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") slaves[slave_name.to_sym] = slave_name.to_sym end slave_groups[slave_group_name.to_sym] = Octopus::SlaveGroup.new(slaves) end @shards_slave_groups[key.to_sym] = slave_groups @sharded = true end elsif value.is_a?(Hash) @groups[key.to_s] = [] value.each do |k, v| fail 'You have duplicated shard names!' if @shards.key?(k.to_sym) initialize_adapter(v['adapter']) config_with_octopus_shard = v.merge(:octopus_shard => k) @shards[k.to_sym] = connection_pool_for(config_with_octopus_shard, "#{v['adapter']}_connection") @groups[key.to_s] << k.to_sym end if structurally_slave_group? value slaves = Hash[@groups[key.to_s].map { |v| [v, v] }] @slave_groups[key.to_sym] = Octopus::SlaveGroup.new(slaves) end end end @shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus end |
#last_current_shard ⇒ Object
164 165 166 |
# File 'lib/octopus/proxy.rb', line 164 def last_current_shard Thread.current['octopus.last_current_shard'] end |
#last_current_shard=(last_current_shard) ⇒ Object
168 169 170 |
# File 'lib/octopus/proxy.rb', line 168 def last_current_shard=(last_current_shard) Thread.current['octopus.last_current_shard'] = last_current_shard end |
#respond_to?(method, include_private = false) ⇒ Boolean
277 278 279 |
# File 'lib/octopus/proxy.rb', line 277 def respond_to?(method, include_private = false) super || select_connection.respond_to?(method, include_private) end |
#run_queries_on_shard(shard, &_block) ⇒ Object
223 224 225 226 227 228 229 |
# File 'lib/octopus/proxy.rb', line 223 def run_queries_on_shard(shard, &_block) keeping_connection_proxy(shard) do using_shard(shard) do yield end end end |
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool. Octopus can potentially retain a reference to a closed connection pool. Previously, that would work since the pool would just reconnect, but in Rails 3.1 the flag prevents this.
203 204 205 206 207 208 209 |
# File 'lib/octopus/proxy.rb', line 203 def safe_connection(connection_pool) connection_pool.automatic_reconnect ||= true if !connection_pool.connected? && @shards[:master].connection.query_cache_enabled connection_pool.connection.enable_query_cache! end connection_pool.connection end |
#select_connection ⇒ Object
211 212 213 |
# File 'lib/octopus/proxy.rb', line 211 def select_connection safe_connection(@shards[shard_name]) end |
#send_queries_to_multiple_shards(shards, &block) ⇒ Object
231 232 233 234 235 |
# File 'lib/octopus/proxy.rb', line 231 def send_queries_to_multiple_shards(shards, &block) shards.each do |shard| run_queries_on_shard(shard, &block) end end |
#send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
314 315 316 |
# File 'lib/octopus/proxy.rb', line 314 def send_queries_to_shard_slave_group(method, *args, &block) send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) end |
#send_queries_to_slave_group(method, *args, &block) ⇒ Object
322 323 324 |
# File 'lib/octopus/proxy.rb', line 322 def send_queries_to_slave_group(method, *args, &block) send_queries_to_balancer(@slave_groups[current_slave_group], method, *args, &block) end |
#shard_name ⇒ Object
215 216 217 |
# File 'lib/octopus/proxy.rb', line 215 def shard_name current_shard.is_a?(Array) ? current_shard.first : current_shard end |
#shard_names ⇒ Object
Public: Retrieves names of all loaded shards.
Returns an array of shard names as symbols
187 188 189 |
# File 'lib/octopus/proxy.rb', line 187 def shard_names @shards.keys end |
#shards_for_group(group) ⇒ Object
Public: Retrieves the defined shards for a given group.
Returns an array of shard names as symbols or nil if the group is not defined.
195 196 197 |
# File 'lib/octopus/proxy.rb', line 195 def shards_for_group(group) @groups.fetch(group.to_s, nil) end |
#should_clean_table_name? ⇒ Boolean
219 220 221 |
# File 'lib/octopus/proxy.rb', line 219 def should_clean_table_name? @adapters.size > 1 end |
#should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
310 311 312 |
# File 'lib/octopus/proxy.rb', line 310 def should_send_queries_to_shard_slave_group?(method) should_use_slaves_for_method?(method) && @shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present? end |
#should_send_queries_to_slave_group?(method) ⇒ Boolean
318 319 320 |
# File 'lib/octopus/proxy.rb', line 318 def should_send_queries_to_slave_group?(method) should_use_slaves_for_method?(method) && @slave_groups.try(:[], current_slave_group).present? end |
#transaction(options = {}, &block) ⇒ Object
250 251 252 253 254 255 256 257 258 |
# File 'lib/octopus/proxy.rb', line 250 def transaction( = {}, &block) if !sharded && current_model_replicated? run_queries_on_shard(:master) do select_connection.transaction(, &block) end else select_connection.transaction(, &block) end end |