Class: GlobalUid::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/global_uid/base.rb

Constant Summary collapse

GLOBAL_UID_DEFAULTS =
{
  :connection_timeout   => 3,
  :connection_retry     => 10.minutes,
  :notifier             => Proc.new { |exception, message| ActiveRecord::Base.logger.error("GlobalUID error:  #{exception} #{message}") },
  :query_timeout        => 10,
  :increment_by         => 5,  # This will define the maximum number of servers that you can have
  :disabled             => false,
  :per_process_affinity => true,
  :dry_run              => false
}
GlobalUidTimer =
Timeout
@@servers =
nil

Class Method Summary collapse

Class Method Details

.create_uid_tables(id_table_name, options = {}) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/global_uid/base.rb', line 27

def self.create_uid_tables(id_table_name, options={})
  type     = options[:uid_type] || "bigint(21) UNSIGNED"
  start_id = options[:start_id] || 1

  # TODO it would be nice to be able to set the engine or something to not be MySQL specific
  with_connections do |connection|
    connection.execute("CREATE TABLE IF NOT EXISTS `#{id_table_name}` (
    `id` #{type} NOT NULL AUTO_INCREMENT,
    `stub` char(1) NOT NULL DEFAULT '',
    PRIMARY KEY (`id`),
    UNIQUE KEY `stub` (`stub`)
    )")

    # prime the pump on each server
    connection.execute("INSERT IGNORE INTO `#{id_table_name}` VALUES(#{start_id}, 'a')")
  end
end

.disconnect!Object



114
115
116
# File 'lib/global_uid/base.rb', line 114

def self.disconnect!
  @@servers = nil
end

.drop_uid_tables(id_table_name, options = {}) ⇒ Object



45
46
47
48
49
# File 'lib/global_uid/base.rb', line 45

def self.drop_uid_tables(id_table_name, options={})
  with_connections do |connection|
    connection.execute("DROP TABLE IF EXISTS `#{id_table_name}`")
  end
end

.get_connections(options = {}) ⇒ Object



184
185
186
# File 'lib/global_uid/base.rb', line 184

def self.get_connections(options = {})
  with_connections {}
end

.get_uid_for_class(klass, options = {}) ⇒ Object



188
189
190
191
192
193
194
195
196
197
# File 'lib/global_uid/base.rb', line 188

def self.get_uid_for_class(klass, options = {})
  with_connections do |connection|
    timeout = self.global_uid_options[:query_timeout]
    GlobalUidTimer.timeout(self.global_uid_options[:query_timeout], TimeoutException) do
      id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES ('a')")
      return id
    end
  end
  raise NoServersAvailableException, "All global UID servers are gone!"
end

.global_uid_optionsObject



203
204
205
# File 'lib/global_uid/base.rb', line 203

def self.global_uid_options
  @global_uid_options
end

.global_uid_options=(options) ⇒ Object



199
200
201
# File 'lib/global_uid/base.rb', line 199

def self.global_uid_options=(options)
  @global_uid_options = GLOBAL_UID_DEFAULTS.merge(options.symbolize_keys)
end

.global_uid_serversObject



207
208
209
# File 'lib/global_uid/base.rb', line 207

def self.global_uid_servers
  self.global_uid_options[:id_servers]
end

.id_table_from_name(name) ⇒ Object



211
212
213
# File 'lib/global_uid/base.rb', line 211

def self.id_table_from_name(name)
  "#{name}_ids".to_sym
end

.init_server_info(options) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/global_uid/base.rb', line 93

def self.init_server_info(options)
  id_servers = self.global_uid_servers

  raise "You haven't configured any id servers" if id_servers.nil? or id_servers.empty?
  raise "More servers configured than increment_by: #{id_servers.size} > #{options[:increment_by]} -- this will create duplicate IDs." if id_servers.size > options[:increment_by]

  offset = 1

  id_servers.map do |name, i|
    info = {}
    info[:cx]       = nil
    info[:name]     = name
    info[:retry_at] = nil
    info[:offset]   = offset
    info[:rand]     = rand
    info[:new?]     = true
    offset +=1
    info
  end
end

.new_connection(name, connection_timeout, offset, increment_by, use_server_variables) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/global_uid/base.rb', line 62

def self.new_connection(name, connection_timeout, offset, increment_by, use_server_variables)
  raise "No id server '#{name}' configured in database.yml" unless ActiveRecord::Base.configurations.has_key?(name)
  config = ActiveRecord::Base.configurations[name]
  c = config.symbolize_keys

  raise "No global_uid support for adapter #{c[:adapter]}" unless ['mysql', 'mysql2'].include?(c[:adapter])

  con = nil
  begin
    GlobalUidTimer.timeout(connection_timeout, ConnectionTimeoutException) do
      con = ActiveRecord::Base.send("#{c[:adapter]}_connection", config)
    end
  rescue ConnectionTimeoutException => e
    notify e, "Timed out establishing a connection to #{name}"
    return nil
  rescue Exception => e
    notify e, "establishing a connection to #{name}: #{e.message}"
    return nil
  end

  # Please note that this is unreliable -- if you lose your CX to the server
  # and auto-reconnect, you will be utterly hosed.  Much better to dedicate a server
  # or two to the cause, and set their auto_increment_increment globally.
  if use_server_variables
    con.execute("set @@auto_increment_increment = #{increment_by}")
    con.execute("set @@auto_increment_offset = #{offset}")
  end

  con
end

.notify(exception, message) ⇒ Object



178
179
180
181
182
# File 'lib/global_uid/base.rb', line 178

def self.notify(exception, message)
  if self.global_uid_options[:notifier]
    self.global_uid_options[:notifier].call(exception, message)
  end
end

.setup_connections!(options) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/global_uid/base.rb', line 118

def self.setup_connections!(options)
  connection_timeout = options[:connection_timeout]
  increment_by       = options[:increment_by]

  if @@servers.nil?
    @@servers = init_server_info(options)
    # sorting here sets up each process to have affinity to a particular server.
    @@servers = @@servers.sort_by { |s| s[:rand] }
  end

  @@servers.each do |info|
    next if info[:cx]

    if info[:new?] || ( info[:retry_at] && Time.now > info[:retry_at] )
      info[:new?] = false

      connection = new_connection(info[:name], connection_timeout, info[:offset], increment_by, options[:use_server_variables])
      info[:cx]  = connection
      info[:retry_at] = Time.now + options[:connection_retry] if connection.nil?
    end
  end

  @@servers
end

.with_connections(options = {}) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/global_uid/base.rb', line 143

def self.with_connections(options = {})
  options = self.global_uid_options.merge(options)
  servers = setup_connections!(options)

  if !options[:per_process_affinity]
    servers = servers.sort_by { rand } #yes, I know it's not true random.
  end

  raise NoServersAvailableException if servers.empty?

  exception_count = 0

  errors = []
  servers.each do |s|
    begin
      yield s[:cx] if s[:cx]
    rescue TimeoutException, Exception => e
      notify e, "#{e.message}"
      errors << e
      s[:cx] = nil
      s[:retry_at] = Time.now + 10.minutes
    end
  end

  # in the case where all servers are gone, put everyone back in.
  if servers.all? { |info| info[:cx].nil? }
    servers.each do |info|
      info[:retry_at] = Time.now - 5.minutes
    end
    raise NoServersAvailableException, "Errors hit: #{errors.map(&:to_s).join(',')}"
  end

  servers.map { |s| s[:cx] }.compact
end