Module: FC::DB

Defined in:
lib/fc/db.rb

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.connect_blockObject

Returns the value of attribute connect_block.



7
8
9
# File 'lib/fc/db.rb', line 7

def connect_block
  @connect_block
end

.err_counterObject

Returns the value of attribute err_counter.



7
8
9
# File 'lib/fc/db.rb', line 7

def err_counter
  @err_counter
end

.loggerObject

Returns the value of attribute logger.



7
8
9
# File 'lib/fc/db.rb', line 7

def logger
  @logger
end

.no_active_recordObject

Returns the value of attribute no_active_record.



7
8
9
# File 'lib/fc/db.rb', line 7

def no_active_record
  @no_active_record
end

.optionsObject

Returns the value of attribute options.



7
8
9
# File 'lib/fc/db.rb', line 7

def options
  @options
end

.prefixObject

Returns the value of attribute prefix.



7
8
9
# File 'lib/fc/db.rb', line 7

def prefix
  @prefix
end

.reconnect_blockObject

Returns the value of attribute reconnect_block.



7
8
9
# File 'lib/fc/db.rb', line 7

def reconnect_block
  @reconnect_block
end

.reconnectingObject

Returns the value of attribute reconnecting.



7
8
9
# File 'lib/fc/db.rb', line 7

def reconnecting
  @reconnecting
end

Class Method Details

.closeObject



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fc/db.rb', line 101

def self.close
  if @options[:multi_threads]
    if @connects[Thread.current.object_id]
      @connects[Thread.current.object_id].close if @connects[Thread.current.object_id]
      @connects.delete(Thread.current.object_id)
    end
  else
    @connects.first[1].close if @connects.first
    @connects.clear
  end
end

.connectObject



69
70
71
72
73
74
75
76
77
78
# File 'lib/fc/db.rb', line 69

def self.connect
  connect_by_block if @connect_block
  return nil unless @options
  connect_by_config(@options) if @reconnecting || @options[:multi_threads] && !@connects[Thread.current.object_id]
  if @options[:multi_threads] 
    @connects[Thread.current.object_id]
  else
    @connects.first && @connects.first[1]
  end
end

.connect!(options = {}) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fc/db.rb', line 80

def self.connect!(options = {})
  close if @connects && @connects[Thread.current.object_id]
  if @connect_block
    connect_by_block(options)
  elsif options[:host] || options[:database] || options[:username] || options[:password]
    connect_by_config(options)
  elsif @options
    connect_by_config(@options.merge(symbolize_keys(options)))
  elsif !@no_active_record && defined?(ActiveRecord::Base) && ActiveRecord::Base.connection
    connect_by_active_record(options)
  else
    connect_by_yml(options)
  end
end

.connect_by_active_record(options = {}) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/fc/db.rb', line 33

def self.connect_by_active_record(options = {})
  if defined?(Octopus::Proxy) && ActiveRecord::Base.connection.is_a?(Octopus::Proxy)
    connection = ActiveRecord::Base.connection.select_connection.instance_variable_get(:@connection)
  else
    connection = ActiveRecord::Base.connection.instance_variable_get(:@connection)
  end
  @options = symbolize_keys(connection.query_options)
  @options.merge!(symbolize_keys(options))
  @prefix = @options[:prefix].to_s if @options[:prefix]
  @connect_block = nil
  @connects = {} unless @connects
  @connects[Thread.current.object_id] = connection
end

.connect_by_block(options = {}) ⇒ Object



55
56
57
58
59
60
61
62
# File 'lib/fc/db.rb', line 55

def self.connect_by_block(options = {})
  connection = @connect_block.call
  @options = connection.query_options.clone.merge(symbolize_keys(options))
  @prefix = @options[:prefix].to_s if @options[:prefix]
  @connects = {} unless @connects
  @connects[Thread.current.object_id] = connection
  @connect_block = nil unless @options[:keep_lazy_connection]
end

.connect_by_config(options) ⇒ Object



18
19
20
21
22
23
24
25
26
# File 'lib/fc/db.rb', line 18

def self.connect_by_config(options)
  @options = symbolize_keys(options)
  @prefix = @options[:prefix].to_s if @options[:prefix]
  connection = Mysql2::Client.new(@options)
  @reconnecting = false
  @connect_block = nil unless @options[:keep_lazy_connection]
  @connects = {} unless @connects
  @connects[Thread.current.object_id] = connection
end

.connect_by_yml(options = {}) ⇒ Object



28
29
30
31
# File 'lib/fc/db.rb', line 28

def self.connect_by_yml(options = {})
  db_options = symbolize_keys(Psych.load(File.read(options_yml_path)))
  connect_by_config(db_options.merge(symbolize_keys(options)))
end

.init_db(silent = false) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/fc/db.rb', line 153

def self.init_db(silent = false)
  FC::DB.query(%{
    CREATE TABLE #{@prefix}items (
      id bigint NOT NULL AUTO_INCREMENT,
      name varchar(1024) NOT NULL DEFAULT '',
      tag varchar(255) DEFAULT NULL,
      outer_id bigint DEFAULT NULL,
      policy_id int NOT NULL,
      dir tinyint(1) NOT NULL DEFAULT 0,
      size bigint NOT NULL DEFAULT 0,
      md5 varchar(32) DEFAULT NULL,
      status ENUM('new', 'ready', 'error', 'delete') NOT NULL DEFAULT 'new',
      time int DEFAULT NULL,
      copies int NOT NULL DEFAULT 0,
      PRIMARY KEY (id), UNIQUE KEY (name(255), policy_id), 
      KEY (outer_id), KEY (time, status), KEY (status, policy_id, copies), KEY (copies, status, policy_id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
  })
  proc_time = %{
    SET NEW.time = UNIX_TIMESTAMP();
  }
  FC::DB.query("CREATE TRIGGER fc_items_before_insert BEFORE INSERT on #{@prefix}items FOR EACH ROW BEGIN #{proc_time} END")
  FC::DB.query("CREATE TRIGGER fc_items_before_update BEFORE UPDATE on #{@prefix}items FOR EACH ROW BEGIN #{proc_time} END")
  
  FC::DB.query(%{
    CREATE TABLE #{@prefix}storages (
      id int NOT NULL AUTO_INCREMENT,
      name varchar(255) NOT NULL DEFAULT '',
      host varchar(255) NOT NULL DEFAULT '',
      path varchar(2048) NOT NULL DEFAULT '',
      url varchar(2048) NOT NULL DEFAULT '',
      size bigint NOT NULL DEFAULT 0,
      size_limit bigint NOT NULL DEFAULT 0,
      check_time int DEFAULT NULL,
      copy_storages varchar(2048) NOT NULL DEFAULT '',
      PRIMARY KEY (id), UNIQUE KEY (name), KEY (host)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
  })
  proc = %{
    # update policy.create_storages on storage delete and update
    UPDATE #{@prefix}policies, 
      (SELECT #{@prefix}policies.id, GROUP_CONCAT(#{@prefix}storages.name ORDER BY FIND_IN_SET(#{@prefix}storages.name, create_storages)) as storages FROM #{@prefix}policies LEFT JOIN #{@prefix}storages ON 
        FIND_IN_SET(#{@prefix}storages.name, create_storages) GROUP BY #{@prefix}policies.id) as policy_create
    SET #{@prefix}policies.create_storages = policy_create.storages
    WHERE policy_create.id = #{@prefix}policies.id;
  }
  proc_update = %{
    IF OLD.name <> NEW.name THEN 
      #{proc}
    END IF;
  }
  FC::DB.query("CREATE TRIGGER fc_storages_after_delete AFTER DELETE on #{@prefix}storages FOR EACH ROW BEGIN #{proc} END")
  FC::DB.query("CREATE TRIGGER fc_storages_after_update AFTER UPDATE on #{@prefix}storages FOR EACH ROW BEGIN #{proc_update} END")
  
  FC::DB.query(%{
    CREATE TABLE #{@prefix}policies (
      id int NOT NULL AUTO_INCREMENT,
      name varchar(255) NOT NULL DEFAULT '',
      create_storages varchar(2048) NOT NULL DEFAULT '',
      copies int NOT NULL DEFAULT 0,
      PRIMARY KEY (id), UNIQUE KEY (name)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
  })
  proc = %{
    # update policy.create_storages on policy change - guarantee valid policy.storages
    SELECT GROUP_CONCAT(name ORDER BY FIND_IN_SET(name, NEW.create_storages)) INTO @create_storages_list FROM #{@prefix}storages WHERE FIND_IN_SET(name, NEW.create_storages);
    SET NEW.create_storages = @create_storages_list;
  }
  FC::DB.query("CREATE TRIGGER fc_policies_before_insert BEFORE INSERT on #{@prefix}policies FOR EACH ROW BEGIN #{proc} END")
  FC::DB.query("CREATE TRIGGER fc_policies_before_update BEFORE UPDATE on #{@prefix}policies FOR EACH ROW BEGIN #{proc} END")
  
  FC::DB.query(%{
    CREATE TABLE #{@prefix}items_storages (
      id bigint NOT NULL AUTO_INCREMENT,
      item_id bigint DEFAULT NULL,
      storage_name varchar(255) DEFAULT NULL,
      status ENUM('new', 'copy', 'error', 'ready', 'delete') NOT NULL DEFAULT 'new',
      time int DEFAULT NULL,
      PRIMARY KEY (id), UNIQUE KEY (item_id, storage_name), KEY (storage_name), KEY (time, status), KEY (status, storage_name),          
      FOREIGN KEY (item_id) REFERENCES #{@prefix}items(id) ON UPDATE RESTRICT ON DELETE RESTRICT,
      FOREIGN KEY (storage_name) REFERENCES #{@prefix}storages(name) ON UPDATE RESTRICT ON DELETE RESTRICT
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
  })
  proc = %{
    SELECT status, copies, size INTO @item_status, @item_copies, @item_size FROM #{@prefix}items WHERE id = NEW.item_id;
    SET @curr_copies = (SELECT count(*) FROM #{@prefix}items_storages WHERE item_id = NEW.item_id AND status <> 'delete');
    SET @curr_copies_ready = (SELECT count(*) FROM #{@prefix}items_storages WHERE item_id = NEW.item_id AND status = 'ready');
    # calc item.copies
    IF @curr_copies <> @item_copies THEN 
      UPDATE #{@prefix}items SET copies=@curr_copies WHERE id = NEW.item_id;
    END IF;
    # check error status
    IF @item_status <> 'new' AND @item_status <> 'delete' AND @curr_copies_ready = 0 THEN 
      UPDATE #{@prefix}items SET status='error' WHERE id = NEW.item_id;
    END IF;
    # check ready status
    IF @curr_copies_ready > 0 THEN 
      UPDATE #{@prefix}items SET status='ready' WHERE id = NEW.item_id;
    END IF;
  }
  proc_add = %{
    #{proc}
    UPDATE #{@prefix}storages SET size=size+@item_size WHERE name = NEW.storage_name;
  }
  proc_del = %{
    #{proc.gsub('NEW', 'OLD')}
    UPDATE #{@prefix}storages SET size=size-@item_size WHERE name = OLD.storage_name;
  }
  FC::DB.query("CREATE TRIGGER fc_items_storages_before_insert BEFORE INSERT on #{@prefix}items_storages FOR EACH ROW BEGIN #{proc_time} END")
  FC::DB.query("CREATE TRIGGER fc_items_storages_before_update BEFORE UPDATE on #{@prefix}items_storages FOR EACH ROW BEGIN #{proc_time} END")
  FC::DB.query("CREATE TRIGGER fc_items_storages_after_update AFTER UPDATE on #{@prefix}items_storages FOR EACH ROW BEGIN #{proc} END")
  FC::DB.query("CREATE TRIGGER fc_items_storages_after_insert AFTER INSERT on #{@prefix}items_storages FOR EACH ROW BEGIN #{proc_add} END")
  FC::DB.query("CREATE TRIGGER fc_items_storages_after_delete AFTER DELETE on #{@prefix}items_storages FOR EACH ROW BEGIN #{proc_del} END")
  
  FC::DB.query(%{
    CREATE TABLE #{@prefix}errors (
      id int NOT NULL AUTO_INCREMENT,
      item_id bigint DEFAULT NULL,
      item_storage_id bigint DEFAULT NULL,
      host varchar(255) DEFAULT NULL,
      message text DEFAULT NULL,
      time int DEFAULT NULL,
      PRIMARY KEY (id), KEY (item_id), KEY (item_storage_id), KEY (host), KEY (time)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
  })
  FC::DB.query("CREATE TRIGGER fc_errors_before_insert BEFORE INSERT on #{@prefix}errors FOR EACH ROW BEGIN #{proc_time} END")
  
  FC::DB.query(%{
    CREATE TABLE #{@prefix}copy_rules (
      id int NOT NULL AUTO_INCREMENT,
      copy_storages varchar(2048) NOT NULL DEFAULT '',
      rule text DEFAULT NULL,
      PRIMARY KEY (id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
  })
  
  FC::DB.query(%{
    CREATE TABLE #{@prefix}vars (
      name varchar(255) NOT NULL DEFAULT '',
      val varchar(255) NOT NULL DEFAULT '',
      descr text DEFAULT NULL,
      time int DEFAULT NULL,
      PRIMARY KEY (name)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
  })
  FC::DB.query("CREATE TRIGGER fc_vars_before_insert BEFORE INSERT on #{@prefix}vars FOR EACH ROW BEGIN #{proc_time} END")
  FC::DB.query("CREATE TRIGGER fc_vars_before_update BEFORE UPDATE on #{@prefix}vars FOR EACH ROW BEGIN #{proc_time} END")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_cycle_time', val='30', descr='time between global daemon checks and storages available checks'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_global_wait_time', val='120', descr='time between runs global daemon if it does not running'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_tasks_copy_group_limit', val='1000', descr='select limit for copy tasks'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_tasks_delete_group_limit', val='10000', descr='select limit for delete tasks'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_tasks_copy_threads_limit', val='10', descr='copy tasks threads count limit for one storage'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_tasks_delete_threads_limit', val='10', descr='delete tasks threads count limit for one storage'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_copy_tasks_per_host_limit', val='10', descr='copy tasks count limit for one host'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_global_tasks_group_limit', val='1000', descr='select limit for create copy tasks'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_global_error_items_ttl', val='86400', descr='ttl for items with error status before delete'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_global_error_items_storages_ttl', val='86400', descr='ttl for items_storages with error status before delete'")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_restart_period', val='86400', descr='time between fc-daemon self restart'")
  
  FC::DB.migrations(silent)
end

.lazy_connect(&block) ⇒ Object



47
48
49
# File 'lib/fc/db.rb', line 47

def self.lazy_connect(&block)
  @connect_block = block
end

.lazy_reconnect(&block) ⇒ Object



51
52
53
# File 'lib/fc/db.rb', line 51

def self.lazy_reconnect(&block)
  @reconnect_block = block
end

.migrate_1Object



329
330
331
332
333
# File 'lib/fc/db.rb', line 329

def self.migrate_1
  FC::DB.query("ALTER TABLE #{@prefix}storages ADD COLUMN url_weight int NOT NULL DEFAULT 0")
  FC::DB.query("ALTER TABLE #{@prefix}storages ADD COLUMN write_weight int NOT NULL DEFAULT 0")
  FC::DB.query("INSERT INTO #{@prefix}vars SET name='daemon_copy_speed_per_host_limit', val='', descr='copy tasks speed limit for hosts, change via fc-manage copy_speed'")
end

.migrate_2Object



335
336
337
# File 'lib/fc/db.rb', line 335

def self.migrate_2
  FC::DB.query("ALTER TABLE #{@prefix}storages ADD COLUMN dc varchar(255) DEFAULT ''")
end

.migrate_3Object



339
340
341
342
# File 'lib/fc/db.rb', line 339

def self.migrate_3
  FC::DB.query("ALTER TABLE #{@prefix}items MODIFY COLUMN status ENUM('new', 'ready', 'error', 'delete', 'deferred_delete') NOT NULL DEFAULT 'new'")
  FC::DB.query("ALTER TABLE #{@prefix}policies ADD COLUMN delete_deferred_time int NOT NULL DEFAULT 0")
end

.migrate_4Object



344
345
346
# File 'lib/fc/db.rb', line 344

def self.migrate_4
  FC::DB.query("ALTER TABLE #{@prefix}storages ADD COLUMN auto_size bigint(20) DEFAULT 0")
end

.migrate_5Object



348
349
350
# File 'lib/fc/db.rb', line 348

def self.migrate_5
  FC::DB.query("ALTER TABLE #{@prefix}storages ADD COLUMN autosync_at bigint(11) DEFAULT 0")
end

.migrate_6Object



352
353
354
# File 'lib/fc/db.rb', line 352

def self.migrate_6
  FC::DB.query("ALTER TABLE #{@prefix}storages ADD COLUMN http_check_time int(11) DEFAULT 0")
end

.migrate_7Object



356
357
358
359
# File 'lib/fc/db.rb', line 356

def self.migrate_7
  FC::DB.query("INSERT IGNORE INTO #{@prefix}vars SET name='daemon_global_delete_limit', val='1000', descr='limits number of deleted items per query'")
  FC::DB.query("INSERT IGNORE INTO #{@prefix}vars SET name='daemon_global_delete_delay', val='1', descr='delay in seconds between items delete query'")
end

.migrations(silent = false) ⇒ Object



319
320
321
322
323
324
325
326
327
# File 'lib/fc/db.rb', line 319

def self.migrations(silent = false)
  next_version = FC::DB.query("SELECT val FROM #{FC::DB.prefix}vars WHERE name='db_version'").first['val'].to_i + 1 rescue 1
  while self.respond_to?("migrate_#{next_version}")
    puts "migrate to #{next_version}" unless silent
    self.send("migrate_#{next_version}")
    FC::DB.query("REPLACE #{FC::DB.prefix}vars SET val=#{next_version}, name='db_version'")
    next_version += 1
  end
end

.options_yml_pathObject



10
11
12
# File 'lib/fc/db.rb', line 10

def self.options_yml_path
  File.expand_path(File.dirname(__FILE__) + '../../../bin/db.yml')
end

.query(sql) ⇒ Object

connect.query with deadlock solution



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/fc/db.rb', line 114

def self.query(sql)
  t1 = Time.new.to_f
  r = FC::DB.connect.query(sql)
  t2 = Time.new.to_f
  @logger.debug(format('FC SQL (%.1fms) %s', (t2 - t1) * 1000, sql)) if @logger
  FC::DB.err_counter = 0
  r = r.each(:as => :hash) {} if r
  r
rescue Mysql2::Error => e
  raise e if e.message =~ /You have an error in your SQL syntax/
  FC::DB.err_counter = FC::DB.err_counter.to_i + 1
  if FC::DB.err_counter > 5
    FC::DB.err_counter = 0
    raise "Too many mysql errors, #{e.message}"
  elsif e.message =~ /Deadlock found when trying to get lock/
    msg = "#{e.message} - retry"
    @logger ? @logger.error(msg) : puts(msg)
    sleep 0.1
    query(sql)
  elsif e.message =~ /Lost connection to MySQL server during query/
    msg = "#{e.message} - reconnect"
    @logger ? @logger.error(msg) : puts(msg)
    FC::DB.connect.ping
    sleep 0.1
    query(sql)
  elsif @options[:reconnect]
    msg = "#{e.message} - reconnect"
    @logger ? @logger.info(msg) : puts(msg)
    reconnect
    query(sql)
  else
    raise e
  end
end

.reconnectObject



95
96
97
98
99
# File 'lib/fc/db.rb', line 95

def self.reconnect
  close if connect
  @reconnecting = true
  @reconnect_block ? reconnect_by_block : connect_by_config(@options)
end

.reconnect_by_blockObject



64
65
66
67
# File 'lib/fc/db.rb', line 64

def self.reconnect_by_block
  @connects[Thread.current.object_id] = @reconnect_block.call
  @reconnecting = false
end

.server_timeObject



149
150
151
# File 'lib/fc/db.rb', line 149

def self.server_time
  FC::DB.query("SELECT UNIX_TIMESTAMP() as curr_time").first['curr_time'].to_i
end

.symbolize_keys(options) ⇒ Object



14
15
16
# File 'lib/fc/db.rb', line 14

def self.symbolize_keys(options)
  options.each_with_object({}) { |el, memo| memo[el[0].to_sym] = el[1] }
end

.versionObject



315
316
317
# File 'lib/fc/db.rb', line 315

def self.version
  return 1
end