Class: Octopus::Proxy

Inherits:
Object
  • Object
show all
Defined in:
lib/octopus/proxy.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = Octopus.config) ⇒ Proxy

Returns a new instance of Proxy.



6
7
8
9
# File 'lib/octopus/proxy.rb', line 6

def initialize(config = Octopus.config)
  initialize_shards(config)
  initialize_replication(config) if !config.nil? && config["replicated"]
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/octopus/proxy.rb', line 206

def method_missing(method, *args, &block)
  if should_clean_connection?(method)
    conn = select_connection()
    self.last_current_shard = self.current_shard
    clean_proxy()
    conn.send(method, *args, &block)
  elsif should_send_queries_to_replicated_databases?(method)
    send_queries_to_selected_slave(method, *args, &block)
  else
    select_connection().send(method, *args, &block)
  end
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



4
5
6
# File 'lib/octopus/proxy.rb', line 4

def config
  @config
end

Instance Method Details

#blockObject



103
104
105
# File 'lib/octopus/proxy.rb', line 103

def block
  Thread.current["octopus.block"]
end

#block=(block) ⇒ Object



107
108
109
# File 'lib/octopus/proxy.rb', line 107

def block=(block)
  Thread.current["octopus.block"] = block
end

#check_schema_migrations(shard) ⇒ Object



190
191
192
193
194
# File 'lib/octopus/proxy.rb', line 190

def check_schema_migrations(shard)
  if !OctopusModel.using(shard).connection.table_exists?(ActiveRecord::Migrator.schema_migrations_table_name())
    OctopusModel.using(shard).connection.initialize_schema_migrations_table
  end
end

#clean_proxyObject



184
185
186
187
188
# File 'lib/octopus/proxy.rb', line 184

def clean_proxy()
  self.current_shard = :master
  self.current_group = nil
  self.block = false
end

#connection_poolObject



223
224
225
# File 'lib/octopus/proxy.rb', line 223

def connection_pool
  return @shards[current_shard]
end

#current_groupObject



90
91
92
# File 'lib/octopus/proxy.rb', line 90

def current_group
  Thread.current["octopus.current_group"]
end

#current_group=(group_symbol) ⇒ Object



94
95
96
97
98
99
100
101
# File 'lib/octopus/proxy.rb', line 94

def current_group=(group_symbol)
  # TODO: Error message should include all groups if given more than one bad name.
  [group_symbol].flatten.compact.each do |group|
    raise "Nonexistent Group Name: #{group}" unless has_group?(group)
  end

  Thread.current["octopus.current_group"] = group_symbol
end

#current_modelObject



68
69
70
# File 'lib/octopus/proxy.rb', line 68

def current_model
  Thread.current["octopus.current_model"]
end

#current_model=(model) ⇒ Object



72
73
74
# File 'lib/octopus/proxy.rb', line 72

def current_model=(model)
  Thread.current["octopus.current_model"] = model.is_a?(ActiveRecord::Base) ? model.class : model
end

#current_shardObject



76
77
78
# File 'lib/octopus/proxy.rb', line 76

def current_shard
  Thread.current["octopus.current_shard"] ||= :master
end

#current_shard=(shard_symbol) ⇒ Object



80
81
82
83
84
85
86
87
88
# File 'lib/octopus/proxy.rb', line 80

def current_shard=(shard_symbol)
  if shard_symbol.is_a?(Array)
    shard_symbol.each {|symbol| raise "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? }
  else
    raise "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil?
  end

  Thread.current["octopus.current_shard"] = shard_symbol
end

#has_group?(group) ⇒ Boolean

Public: Whether or not a group exists with the given name converted to a string.

Returns a boolean.

Returns:

  • (Boolean)


123
124
125
# File 'lib/octopus/proxy.rb', line 123

def has_group?(group)
  @groups.has_key?(group.to_s)
end

#initialize_replication(config) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/octopus/proxy.rb', line 56

def initialize_replication(config)
  @replicated = true
  if config.has_key?("fully_replicated")
    @fully_replicated = config["fully_replicated"]
  else
    @fully_replicated = true
  end
  @slaves_list = @shards.keys.map {|sym| sym.to_s}.sort
  @slaves_list.delete('master')
  @slave_index = 0
end

#initialize_shards(config) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/octopus/proxy.rb', line 11

def initialize_shards(config)
  @shards = HashWithIndifferentAccess.new
  @groups = {}
  @adapters = Set.new
  @shards[:master] = ActiveRecord::Base.connection_pool_without_octopus()
  @config = ActiveRecord::Base.connection_pool_without_octopus.connection.instance_variable_get(:@config)

  if !config.nil? && config.has_key?("verify_connection")
    @verify_connection = config["verify_connection"]
  else
    @verify_connection = false
  end

  if !config.nil?
    @entire_sharded = config['entire_sharded']
    shards_config = config[Octopus.rails_env()]
  end

  shards_config ||= []

  shards_config.each do |key, value|
    if value.is_a?(String) && Octopus.rails32?
      value = resolve_string_connection(value).merge(:octopus_shard => key)
      initialize_adapter(value['adapter'])
      @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection")
    elsif value.is_a?(Hash) && value.has_key?("adapter")
      value.merge!(:octopus_shard => key)
      initialize_adapter(value['adapter'])
      @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection")
    elsif value.is_a?(Hash)
      @groups[key.to_s] = []

      value.each do |k, v|
        raise "You have duplicated shard names!" if @shards.has_key?(k.to_sym)

        initialize_adapter(v['adapter'])
        config_with_octopus_shard = v.merge(:octopus_shard => k)

        @shards[k.to_sym] = connection_pool_for(config_with_octopus_shard, "#{v['adapter']}_connection")
        @groups[key.to_s] << k.to_sym
      end
    end
  end
end

#last_current_shardObject



111
112
113
# File 'lib/octopus/proxy.rb', line 111

def last_current_shard
  Thread.current["octopus.last_current_shard"]
end

#last_current_shard=(last_current_shard) ⇒ Object



115
116
117
# File 'lib/octopus/proxy.rb', line 115

def last_current_shard=(last_current_shard)
  Thread.current["octopus.last_current_shard"] = last_current_shard
end

#respond_to?(method, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


219
220
221
# File 'lib/octopus/proxy.rb', line 219

def respond_to?(method, include_private = false)
  super || select_connection.respond_to?(method, include_private)
end

#run_queries_on_shard(shard, &block) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/octopus/proxy.rb', line 164

def run_queries_on_shard(shard, &block)
  older_shard = self.current_shard
  last_block = self.block

  begin
    self.block = true
    self.current_shard = shard
    yield
  ensure
    self.block = last_block || false
    self.current_shard = older_shard
  end
end

#select_connectionObject



142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/octopus/proxy.rb', line 142

def select_connection
  @shards[shard_name].verify_active_connections! if @verify_connection
  # Rails 3.1 sets automatic_reconnect to false when it removes
  # connection pool.  Octopus can potentially retain a reference to a closed
  # connection pool.  Previously, that would work since the pool would just
  # reconnect, but in Rails 3.1 the flag prevents this.
  if Octopus.rails31? || Octopus.rails32?
    if !@shards[shard_name].automatic_reconnect
      @shards[shard_name].automatic_reconnect = true
    end
  end
  @shards[shard_name].connection()
end

#send_queries_to_multiple_shards(shards, &block) ⇒ Object



178
179
180
181
182
# File 'lib/octopus/proxy.rb', line 178

def send_queries_to_multiple_shards(shards, &block)
  shards.each do |shard|
    self.run_queries_on_shard(shard, &block)
  end
end

#shard_nameObject



156
157
158
# File 'lib/octopus/proxy.rb', line 156

def shard_name
  current_shard.is_a?(Array) ? current_shard.first : current_shard
end

#shard_namesObject

Public: Retrieves names of all loaded shards.

Returns an array of shard names as symbols



130
131
132
# File 'lib/octopus/proxy.rb', line 130

def shard_names
  @shards.keys
end

#shards_for_group(group) ⇒ Object

Public: Retrieves the defined shards for a given group.

Returns an array of shard names as symbols or nil if the group is not defined.



138
139
140
# File 'lib/octopus/proxy.rb', line 138

def shards_for_group(group)
  @groups.fetch(group.to_s, nil)
end

#should_clean_table_name?Boolean

Returns:

  • (Boolean)


160
161
162
# File 'lib/octopus/proxy.rb', line 160

def should_clean_table_name?
  @adapters.size > 1
end

#transaction(options = {}, &block) ⇒ Object



196
197
198
199
200
201
202
203
204
# File 'lib/octopus/proxy.rb', line 196

def transaction(options = {}, &block)
  if @replicated && (current_model.replicated || @fully_replicated)
    self.run_queries_on_shard(:master) do
      select_connection.transaction(options, &block)
    end
  else
    select_connection.transaction(options, &block)
  end
end