Class: Octopus::Proxy
- Inherits:
-
Object
- Object
- Octopus::Proxy
- Defined in:
- lib/octopus/proxy.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#sharded ⇒ Object
Returns the value of attribute sharded.
Instance Method Summary collapse
- #block ⇒ Object
- #block=(block) ⇒ Object
- #check_schema_migrations(shard) ⇒ Object
- #clean_connection_proxy ⇒ Object
- #clear_active_connections! ⇒ Object
- #clear_all_connections! ⇒ Object
- #clear_query_cache ⇒ Object
- #connected? ⇒ Boolean
- #connection_pool ⇒ Object
- #current_group ⇒ Object
- #current_group=(group_symbol) ⇒ Object
- #current_model ⇒ Object
- #current_model=(model) ⇒ Object
- #current_shard ⇒ Object
- #current_shard=(shard_symbol) ⇒ Object
- #current_slave_group ⇒ Object
- #current_slave_group=(slave_group_symbol) ⇒ Object
- #disable_query_cache! ⇒ Object
- #enable_query_cache! ⇒ Object
- #fully_replicated? ⇒ Boolean
-
#has_group?(group) ⇒ Boolean
Public: Whether or not a group exists with the given name converted to a string.
-
#initialize(config = Octopus.config) ⇒ Proxy
constructor
A new instance of Proxy.
- #initialize_replication(config) ⇒ Object
- #initialize_shards(config) ⇒ Object
- #last_current_shard ⇒ Object
- #last_current_shard=(last_current_shard) ⇒ Object
- #method_missing(method, *args, &block) ⇒ Object
- #respond_to?(method, include_private = false) ⇒ Boolean
- #run_queries_on_shard(shard, &_block) ⇒ Object
-
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool.
- #select_connection ⇒ Object
- #send_queries_to_multiple_shards(shards, &block) ⇒ Object
- #send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
- #send_queries_to_slave_group(method, *args, &block) ⇒ Object
- #shard_name ⇒ Object
-
#shard_names ⇒ Object
Public: Retrieves names of all loaded shards.
-
#shards_for_group(group) ⇒ Object
Public: Retrieves the defined shards for a given group.
- #should_clean_table_name? ⇒ Boolean
- #should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
- #should_send_queries_to_slave_group?(method) ⇒ Boolean
- #transaction(options = {}, &block) ⇒ Object
Constructor Details
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &block) ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/octopus/proxy.rb', line 257 def method_missing(method, *args, &block) if should_clean_connection_proxy?(method) conn = select_connection self.last_current_shard = current_shard clean_connection_proxy conn.send(method, *args, &block) elsif should_send_queries_to_shard_slave_group?(method) send_queries_to_shard_slave_group(method, *args, &block) elsif should_send_queries_to_slave_group?(method) send_queries_to_slave_group(method, *args, &block) elsif should_send_queries_to_replicated_databases?(method) send_queries_to_selected_slave(method, *args, &block) else select_connection.send(method, *args, &block) end end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
7 8 9 |
# File 'lib/octopus/proxy.rb', line 7 def config @config end |
#sharded ⇒ Object
Returns the value of attribute sharded.
7 8 9 |
# File 'lib/octopus/proxy.rb', line 7 def sharded @sharded end |
Instance Method Details
#block ⇒ Object
156 157 158 |
# File 'lib/octopus/proxy.rb', line 156 def block Thread.current['octopus.block'] end |
#block=(block) ⇒ Object
160 161 162 |
# File 'lib/octopus/proxy.rb', line 160 def block=(block) Thread.current['octopus.block'] = block end |
#check_schema_migrations(shard) ⇒ Object
240 241 242 243 244 |
# File 'lib/octopus/proxy.rb', line 240 def check_schema_migrations(shard) OctopusModel.using(shard).connection.table_exists?( ActiveRecord::Migrator.schema_migrations_table_name, ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table end |
#clean_connection_proxy ⇒ Object
234 235 236 237 238 |
# File 'lib/octopus/proxy.rb', line 234 def clean_connection_proxy self.current_shard = :master self.current_group = nil self.block = false end |
#clear_active_connections! ⇒ Object
295 296 297 |
# File 'lib/octopus/proxy.rb', line 295 def clear_active_connections! @shards.each { |_k, v| v.release_connection } end |
#clear_all_connections! ⇒ Object
299 300 301 |
# File 'lib/octopus/proxy.rb', line 299 def clear_all_connections! @shards.each { |_k, v| v.disconnect! } end |
#clear_query_cache ⇒ Object
291 292 293 |
# File 'lib/octopus/proxy.rb', line 291 def clear_query_cache @shards.each { |_k, v| safe_connection(v).clear_query_cache } end |
#connected? ⇒ Boolean
303 304 305 |
# File 'lib/octopus/proxy.rb', line 303 def connected? @shards.any? { |_k, v| v.connected? } end |
#connection_pool ⇒ Object
278 279 280 |
# File 'lib/octopus/proxy.rb', line 278 def connection_pool @shards[current_shard] end |
#current_group ⇒ Object
135 136 137 |
# File 'lib/octopus/proxy.rb', line 135 def current_group Thread.current['octopus.current_group'] end |
#current_group=(group_symbol) ⇒ Object
139 140 141 142 143 144 145 146 |
# File 'lib/octopus/proxy.rb', line 139 def current_group=(group_symbol) # TODO: Error message should include all groups if given more than one bad name. [group_symbol].flatten.compact.each do |group| fail "Nonexistent Group Name: #{group}" unless has_group?(group) end Thread.current['octopus.current_group'] = group_symbol end |
#current_model ⇒ Object
92 93 94 |
# File 'lib/octopus/proxy.rb', line 92 def current_model Thread.current['octopus.current_model'] end |
#current_model=(model) ⇒ Object
96 97 98 |
# File 'lib/octopus/proxy.rb', line 96 def current_model=(model) Thread.current['octopus.current_model'] = model.is_a?(ActiveRecord::Base) ? model.class : model end |
#current_shard ⇒ Object
100 101 102 |
# File 'lib/octopus/proxy.rb', line 100 def current_shard Thread.current['octopus.current_shard'] ||= :master end |
#current_shard=(shard_symbol) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/octopus/proxy.rb', line 104 def current_shard=(shard_symbol) self.current_slave_group = nil if shard_symbol.is_a?(Array) shard_symbol.each { |symbol| fail "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? } elsif shard_symbol.is_a?(Hash) hash = shard_symbol shard_symbol = hash[:shard] slave_group_symbol = hash[:slave_group] if shard_symbol.nil? && slave_group_symbol.nil? fail 'Neither shard or slave group must be specified' end if shard_symbol.present? fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end if slave_group_symbol.present? if (@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) || (@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?) fail "Nonexistent Slave Group Name: #{slave_group_symbol} in shards config: #{@shards_config.inspect}" end self.current_slave_group = slave_group_symbol end else fail "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end Thread.current['octopus.current_shard'] = shard_symbol end |
#current_slave_group ⇒ Object
148 149 150 |
# File 'lib/octopus/proxy.rb', line 148 def current_slave_group Thread.current['octopus.current_slave_group'] end |
#current_slave_group=(slave_group_symbol) ⇒ Object
152 153 154 |
# File 'lib/octopus/proxy.rb', line 152 def current_slave_group=(slave_group_symbol) Thread.current['octopus.current_slave_group'] = slave_group_symbol end |
#disable_query_cache! ⇒ Object
287 288 289 |
# File 'lib/octopus/proxy.rb', line 287 def disable_query_cache! @shards.each { |_k, v| safe_connection(v).disable_query_cache! } end |
#enable_query_cache! ⇒ Object
282 283 284 285 |
# File 'lib/octopus/proxy.rb', line 282 def enable_query_cache! clear_query_cache @shards.each { |_k, v| safe_connection(v).enable_query_cache! } end |
#fully_replicated? ⇒ Boolean
172 173 174 |
# File 'lib/octopus/proxy.rb', line 172 def fully_replicated? @fully_replicated || Thread.current['octopus.fully_replicated'] end |
#has_group?(group) ⇒ Boolean
Public: Whether or not a group exists with the given name converted to a string.
Returns a boolean.
180 181 182 |
# File 'lib/octopus/proxy.rb', line 180 def has_group?(group) @groups.key?(group.to_s) end |
#initialize_replication(config) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/octopus/proxy.rb', line 79 def initialize_replication(config) @replicated = true if config.key?('fully_replicated') @fully_replicated = config['fully_replicated'] else @fully_replicated = true end @slaves_list = @shards.keys.map { |sym| sym.to_s }.sort @slaves_list.delete('master') @slaves_load_balancer = Octopus::LoadBalancing::RoundRobin.new(@slaves_list) end |
#initialize_shards(config) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/octopus/proxy.rb', line 14 def initialize_shards(config) @shards = HashWithIndifferentAccess.new @shards_slave_groups = HashWithIndifferentAccess.new @slave_groups = HashWithIndifferentAccess.new @groups = {} @adapters = Set.new @config = ActiveRecord::Base.connection_pool_without_octopus.connection.instance_variable_get(:@config) unless config.nil? @entire_sharded = config['entire_sharded'] @shards_config = config[Octopus.rails_env] end @shards_config ||= [] @shards_config.each do |key, value| if value.is_a?(String) value = resolve_string_connection(value).merge(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") elsif value.is_a?(Hash) && value.key?('adapter') value.merge!(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") slave_group_configs = value.select do |_k, v| structurally_slave_group? v end if slave_group_configs.present? slave_groups = HashWithIndifferentAccess.new slave_group_configs.each do |slave_group_name, slave_configs| slaves = HashWithIndifferentAccess.new slave_configs.each do |slave_name, slave_config| @shards[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") slaves[slave_name.to_sym] = slave_name.to_sym end slave_groups[slave_group_name.to_sym] = Octopus::SlaveGroup.new(slaves) end @shards_slave_groups[key.to_sym] = slave_groups @sharded = true end elsif value.is_a?(Hash) @groups[key.to_s] = [] value.each do |k, v| fail 'You have duplicated shard names!' if @shards.key?(k.to_sym) initialize_adapter(v['adapter']) config_with_octopus_shard = v.merge(:octopus_shard => k) @shards[k.to_sym] = connection_pool_for(config_with_octopus_shard, "#{v['adapter']}_connection") @groups[key.to_s] << k.to_sym end if structurally_slave_group? value slaves = Hash[@groups[key.to_s].map { |v| [v, v] }] @slave_groups[key.to_sym] = Octopus::SlaveGroup.new(slaves) end end end @shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus end |
#last_current_shard ⇒ Object
164 165 166 |
# File 'lib/octopus/proxy.rb', line 164 def last_current_shard Thread.current['octopus.last_current_shard'] end |
#last_current_shard=(last_current_shard) ⇒ Object
168 169 170 |
# File 'lib/octopus/proxy.rb', line 168 def last_current_shard=(last_current_shard) Thread.current['octopus.last_current_shard'] = last_current_shard end |
#respond_to?(method, include_private = false) ⇒ Boolean
274 275 276 |
# File 'lib/octopus/proxy.rb', line 274 def respond_to?(method, include_private = false) super || select_connection.respond_to?(method, include_private) end |
#run_queries_on_shard(shard, &_block) ⇒ Object
220 221 222 223 224 225 226 |
# File 'lib/octopus/proxy.rb', line 220 def run_queries_on_shard(shard, &_block) keeping_connection_proxy do using_shard(shard) do yield end end end |
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool. Octopus can potentially retain a reference to a closed connection pool. Previously, that would work since the pool would just reconnect, but in Rails 3.1 the flag prevents this.
203 204 205 206 |
# File 'lib/octopus/proxy.rb', line 203 def safe_connection(connection_pool) connection_pool.automatic_reconnect ||= true connection_pool.connection end |
#select_connection ⇒ Object
208 209 210 |
# File 'lib/octopus/proxy.rb', line 208 def select_connection safe_connection(@shards[shard_name]) end |
#send_queries_to_multiple_shards(shards, &block) ⇒ Object
228 229 230 231 232 |
# File 'lib/octopus/proxy.rb', line 228 def send_queries_to_multiple_shards(shards, &block) shards.each do |shard| run_queries_on_shard(shard, &block) end end |
#send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
311 312 313 |
# File 'lib/octopus/proxy.rb', line 311 def send_queries_to_shard_slave_group(method, *args, &block) send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) end |
#send_queries_to_slave_group(method, *args, &block) ⇒ Object
319 320 321 |
# File 'lib/octopus/proxy.rb', line 319 def send_queries_to_slave_group(method, *args, &block) send_queries_to_balancer(@slave_groups[current_slave_group], method, *args, &block) end |
#shard_name ⇒ Object
212 213 214 |
# File 'lib/octopus/proxy.rb', line 212 def shard_name current_shard.is_a?(Array) ? current_shard.first : current_shard end |
#shard_names ⇒ Object
Public: Retrieves names of all loaded shards.
Returns an array of shard names as symbols
187 188 189 |
# File 'lib/octopus/proxy.rb', line 187 def shard_names @shards.keys end |
#shards_for_group(group) ⇒ Object
Public: Retrieves the defined shards for a given group.
Returns an array of shard names as symbols or nil if the group is not defined.
195 196 197 |
# File 'lib/octopus/proxy.rb', line 195 def shards_for_group(group) @groups.fetch(group.to_s, nil) end |
#should_clean_table_name? ⇒ Boolean
216 217 218 |
# File 'lib/octopus/proxy.rb', line 216 def should_clean_table_name? @adapters.size > 1 end |
#should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
307 308 309 |
# File 'lib/octopus/proxy.rb', line 307 def should_send_queries_to_shard_slave_group?(method) should_use_slaves_for_method?(method) && @shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present? end |
#should_send_queries_to_slave_group?(method) ⇒ Boolean
315 316 317 |
# File 'lib/octopus/proxy.rb', line 315 def should_send_queries_to_slave_group?(method) should_use_slaves_for_method?(method) && @slave_groups.try(:[], current_slave_group).present? end |
#transaction(options = {}, &block) ⇒ Object
246 247 248 249 250 251 252 253 254 255 |
# File 'lib/octopus/proxy.rb', line 246 def transaction( = {}, &block) replicated = @replicated && (current_model.replicated || fully_replicated?) if !sharded && replicated run_queries_on_shard(:master) do select_connection.transaction(, &block) end else select_connection.transaction(, &block) end end |