Class: GlobalUid::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/global_uid/base.rb

Constant Summary collapse

GLOBAL_UID_DEFAULTS =
{
  :connection_timeout   => 3,
  :connection_retry     => 10.minutes,
  :notifier             => Proc.new { |exception, message| ActiveRecord::Base.logger.error("GlobalUID error:  #{exception} #{message}") },
  :query_timeout        => 10,
  :increment_by         => 5,  # This will define the maximum number of servers that you can have
  :disabled             => false,
  :per_process_affinity => true,
  :dry_run              => false
}
GlobalUidTimer =
Timeout

Class Method Summary collapse

Class Method Details

.create_uid_tables(id_table_name, options = {}) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/global_uid/base.rb', line 35

def self.create_uid_tables(id_table_name, options={})
  type     = options[:uid_type] || "bigint(21) UNSIGNED"
  start_id = options[:start_id] || 1

  engine_stmt = "ENGINE=#{global_uid_options[:storage_engine] || "MyISAM"}"

  # TODO it would be nice to be able to set the engine or something to not be MySQL specific
  with_connections do |connection|
    connection.execute("CREATE TABLE IF NOT EXISTS `#{id_table_name}` (
    `id` #{type} NOT NULL AUTO_INCREMENT,
    `stub` char(1) NOT NULL DEFAULT '',
    PRIMARY KEY (`id`),
    UNIQUE KEY `stub` (`stub`)
    ) #{engine_stmt}")

    # prime the pump on each server
    connection.execute("INSERT IGNORE INTO `#{id_table_name}` VALUES(#{start_id}, 'a')")
  end
end

.disconnect!Object



116
117
118
# File 'lib/global_uid/base.rb', line 116

def self.disconnect!
  self.servers = nil
end

.drop_uid_tables(id_table_name, options = {}) ⇒ Object



55
56
57
58
59
# File 'lib/global_uid/base.rb', line 55

def self.drop_uid_tables(id_table_name, options={})
  with_connections do |connection|
    connection.execute("DROP TABLE IF EXISTS `#{id_table_name}`")
  end
end

.get_connections(options = {}) ⇒ Object



184
185
186
# File 'lib/global_uid/base.rb', line 184

def self.get_connections(options = {})
  with_connections {}
end

.get_many_uids_for_class(klass, count, options = {}) ⇒ Object



198
199
200
201
202
203
204
205
206
207
208
# File 'lib/global_uid/base.rb', line 198

def self.get_many_uids_for_class(klass, count, options = {})
  return [] unless count > 0
  with_connections do |connection|
    GlobalUidTimer.timeout(self.global_uid_options[:query_timeout], TimeoutException) do
      increment_by = connection.select_value("SELECT @@auto_increment_increment")
      start_id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES " + (["('a')"] * count).join(','))
      return start_id.step(start_id + (count-1) * increment_by, increment_by).to_a
    end
  end
  raise NoServersAvailableException, "All global UID servers are gone!"
end

.get_uid_for_class(klass, options = {}) ⇒ Object



188
189
190
191
192
193
194
195
196
# File 'lib/global_uid/base.rb', line 188

def self.get_uid_for_class(klass, options = {})
  with_connections do |connection|
    GlobalUidTimer.timeout(self.global_uid_options[:query_timeout], TimeoutException) do
      id = connection.insert("REPLACE INTO #{klass.global_uid_table} (stub) VALUES ('a')")
      return id
    end
  end
  raise NoServersAvailableException, "All global UID servers are gone!"
end

.global_uid_optionsObject



214
215
216
# File 'lib/global_uid/base.rb', line 214

def self.global_uid_options
  @global_uid_options
end

.global_uid_options=(options) ⇒ Object



210
211
212
# File 'lib/global_uid/base.rb', line 210

def self.global_uid_options=(options)
  @global_uid_options = GLOBAL_UID_DEFAULTS.merge(options.symbolize_keys)
end

.global_uid_serversObject



218
219
220
# File 'lib/global_uid/base.rb', line 218

def self.global_uid_servers
  self.global_uid_options[:id_servers]
end

.id_table_from_name(name) ⇒ Object



222
223
224
# File 'lib/global_uid/base.rb', line 222

def self.id_table_from_name(name)
  "#{name}_ids".to_sym
end

.init_server_info(options) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/global_uid/base.rb', line 95

def self.init_server_info(options)
  id_servers = self.global_uid_servers

  raise "You haven't configured any id servers" if id_servers.nil? or id_servers.empty?
  raise "More servers configured than increment_by: #{id_servers.size} > #{options[:increment_by]} -- this will create duplicate IDs." if id_servers.size > options[:increment_by]

  offset = 1

  id_servers.map do |name, i|
    info = {}
    info[:cx]       = nil
    info[:name]     = name
    info[:retry_at] = nil
    info[:offset]   = offset
    info[:rand]     = rand
    info[:new?]     = true
    offset +=1
    info
  end
end

.new_connection(name, connection_timeout, offset, increment_by) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/global_uid/base.rb', line 72

def self.new_connection(name, connection_timeout, offset, increment_by)
  raise "No id server '#{name}' configured in database.yml" unless ActiveRecord::Base.configurations.has_key?(name)
  config = ActiveRecord::Base.configurations[name]
  c = config.symbolize_keys

  raise "No global_uid support for adapter #{c[:adapter]}" if c[:adapter] != 'mysql2'

  con = nil
  begin
    GlobalUidTimer.timeout(connection_timeout, ConnectionTimeoutException) do
      con = ActiveRecord::Base.send("#{c[:adapter]}_connection", config)
    end
  rescue ConnectionTimeoutException => e
    notify e, "Timed out establishing a connection to #{name}"
    return nil
  rescue Exception => e
    notify e, "establishing a connection to #{name}: #{e.message}"
    return nil
  end

  con
end

.notify(exception, message) ⇒ Object



178
179
180
181
182
# File 'lib/global_uid/base.rb', line 178

def self.notify(exception, message)
  if self.global_uid_options[:notifier]
    self.global_uid_options[:notifier].call(exception, message)
  end
end

.serversObject



26
27
28
29
# File 'lib/global_uid/base.rb', line 26

def self.servers
  # Thread local storage is inheritted on `fork`, include the pid
  Thread.current["global_uid_servers_#{$$}"]
end

.servers=(s) ⇒ Object



31
32
33
# File 'lib/global_uid/base.rb', line 31

def self.servers=(s)
  Thread.current["global_uid_servers_#{$$}"] = s
end

.setup_connections!(options) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/global_uid/base.rb', line 120

def self.setup_connections!(options)
  connection_timeout = options[:connection_timeout]
  increment_by       = options[:increment_by]

  if self.servers.nil?
    self.servers = init_server_info(options)
    # sorting here sets up each process to have affinity to a particular server.
    self.servers = self.servers.sort_by { |s| s[:rand] }
  end

  self.servers.each do |info|
    next if info[:cx]

    if info[:new?] || ( info[:retry_at] && Time.now > info[:retry_at] )
      info[:new?] = false

      connection = new_connection(info[:name], connection_timeout, info[:offset], increment_by)
      info[:cx]  = connection
      info[:retry_at] = Time.now + options[:connection_retry] if connection.nil?
    end
  end

  self.servers
end

.with_connections(options = {}) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/global_uid/base.rb', line 145

def self.with_connections(options = {})
  options = self.global_uid_options.merge(options)
  servers = setup_connections!(options)

  if !options[:per_process_affinity]
    servers = servers.sort_by { rand } #yes, I know it's not true random.
  end

  raise NoServersAvailableException if servers.empty?

  errors = []
  servers.each do |s|
    begin
      yield s[:cx] if s[:cx]
    rescue TimeoutException, Exception => e
      notify e, "#{e.message}"
      errors << e
      s[:cx] = nil
      s[:retry_at] = Time.now + 1.minute
    end
  end

  # in the case where all servers are gone, put everyone back in.
  if servers.all? { |info| info[:cx].nil? }
    servers.each do |info|
      info[:retry_at] = Time.now - 5.minutes
    end
    raise NoServersAvailableException, "Errors hit: #{errors.map(&:to_s).join(',')}"
  end

  servers.map { |s| s[:cx] }.compact
end