Top Level Namespace
Defined Under Namespace
Modules: FC
Classes: BaseThread, CheckThread, GlobalDaemonThread, TaskThread
Instance Method Summary
collapse
Instance Method Details
#check_tasks(type) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/daemon.rb', line 65
def check_tasks(type)
storages_names = $storages.map{|storage| "'#{storage.name}'"}.join(',')
cond = "storage_name in (#{storages_names}) AND status='#{type.to_s}'"
ids = $tasks.map{|storage_name, storage_tasks| storage_tasks.select{|task| task[:action] == type}}.
flatten.map{|task| task[:item_storage].id}
ids += $curr_tasks.select{|task| task[:action] == type}.map{|task| task[:item_storage].id}
limit = FC::Var.get('daemon_global_tasks_group_limit', 1000).to_i
cond << "AND id not in (#{ids.join(',')})" if (ids.length > 0)
cond << " LIMIT #{limit}"
FC::ItemStorage.where(cond).each do |item_storage|
unless ids.include?(item_storage.id)
$tasks[item_storage.storage_name] = [] unless $tasks[item_storage.storage_name]
$tasks[item_storage.storage_name] << {:action => type, :item_storage => item_storage}
$log.debug("task add: type=#{type}, item_storage=#{item_storage.id}")
end
end
end
|
#colorize_string(str, color) ⇒ Object
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/utils.rb', line 61
def colorize_string(str, color)
return str unless color
case color.to_s
when 'red'
color_code = 31
when 'green'
color_code = 32
when 'yellow'
color_code = 33
when 'pink'
color_code = 35
else
color_code = color.to_i
end
"\e[#{color_code}m#{str}\e[0m"
end
|
#error(msg, options = {}) ⇒ Object
7
8
9
10
|
# File 'lib/daemon.rb', line 7
def error(msg, options = {})
$log.error(msg)
FC::Error.new(options.merge(:host => FC::Storage.curr_host, :message => msg)).save
end
|
#human_to_size(size) ⇒ Object
33
34
35
36
37
38
39
40
41
|
# File 'lib/utils.rb', line 33
def human_to_size(size)
r = /^(\d+(\.\d+)?)\s*(.*)/
units = {'k' => 1024, 'm' => 1024*1024, 'g' => 1024*1024*1024, 't' => 1024*1024*1024*1024}
return nil unless matches = size.to_s.match(r)
unit = units[matches[3].to_s.strip.downcase[0]]
result = matches[1].to_f
result *= unit if unit
result.to_i
end
|
#option_parser_init(descriptions, text) ⇒ Object
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/utils.rb', line 1
def option_parser_init(descriptions, text)
options = {}
optparse = OptionParser.new do |opts|
opts.banner = text
opts.separator "Options:"
descriptions.each_entry do |key, desc|
options[key] = desc[:default]
opts.on("-#{desc[:short]}", "--#{desc[:full]}#{desc[:no_val] ? '' : '='+desc[:full].upcase}", desc[:text]) {|s| options[key] = s }
end
opts.on_tail("-?", "--help", "Show this message") do
puts opts
exit
end
opts.on_tail("-v", "--version", "Show version") do
puts FC::VERSION
exit
end
end
optparse.parse!
options['optparse'] = optparse
options
end
|
#policies_add ⇒ Object
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/manage/policies.rb', line 25
def policies_add
puts "Add Policy"
name = stdin_read_val('Name')
create_storages = stdin_read_val('Create storages')
copy_storages = stdin_read_val('Copy storages')
copies = stdin_read_val('Copies').to_i
storages = FC::Storage.where.map(&:name)
create_storages = create_storages.split(',').select{|s| storages.member?(s.strip)}.join(',').strip
copy_storages = copy_storages.split(',').select{|s| storages.member?(s.strip)}.join(',').strip
begin
policy = FC::Policy.new(:name => name, :create_storages => create_storages, :copy_storages => copy_storages, :copies => copies)
rescue Exception => e
puts "Error: #{e.message}"
exit
end
puts %Q{\nPolicy
Name: #{name}
Create storages: #{create_storages}
Copy storages: #{copy_storages}
Copies: #{copies}}
s = Readline.readline("Continue? (y/n) ", false).strip.downcase
puts ""
if s == "y" || s == "yes"
begin
policy.save
rescue Exception => e
puts "Error: #{e.message}"
exit
end
puts "ok"
else
puts "Canceled."
end
end
|
#policies_change ⇒ Object
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/manage/policies.rb', line 75
def policies_change
if policy = find_policy
puts "Change policy ##{policy.id} #{policy.name}"
name = stdin_read_val("Name (now #{policy.name})", true)
create_storages = stdin_read_val("Create storages (now #{policy.create_storages})", true)
copy_storages = stdin_read_val("Copy storages (now #{policy.copy_storages})", true)
copies = stdin_read_val("Copies (now #{policy.copies})", true)
storages = FC::Storage.where.map(&:name)
create_storages = create_storages.split(',').select{|s| storages.member?(s.strip)}.join(',').strip unless create_storages.empty?
copy_storages = copy_storages.split(',').select{|s| storages.member?(s.strip)}.join(',').strip unless copy_storages.empty?
policy.name = name unless name.empty?
policy.create_storages = create_storages unless create_storages.empty?
policy.copy_storages = copy_storages unless copy_storages.empty?
policy.copies = copies.to_i unless copies.empty?
puts %Q{\nStorage
Name: #{policy.name}
Create storages: #{policy.create_storages}
Copy storages: #{policy.copy_storages}
Copies: #{policy.copies}}
s = Readline.readline("Continue? (y/n) ", false).strip.downcase
puts ""
if s == "y" || s == "yes"
begin
policy.save
rescue Exception => e
puts "Error: #{e.message}"
exit
end
puts "ok"
else
puts "Canceled."
end
end
end
|
#policies_list ⇒ Object
1
2
3
4
5
6
7
8
9
10
|
# File 'lib/manage/policies.rb', line 1
def policies_list
policies = FC::Policy.where
if policies.size == 0
puts "No storages."
else
policies.each do |policy|
puts "##{policy.id} #{policy.name}, create storages: #{policy.create_storages}, copy storages: #{policy.copy_storages}, copies: #{policy.copies}"
end
end
end
|
#policies_rm ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/manage/policies.rb', line 62
def policies_rm
if policy = find_policy
s = Readline.readline("Continue? (y/n) ", false).strip.downcase
puts ""
if s == "y" || s == "yes"
policy.delete
puts "ok"
else
puts "Canceled."
end
end
end
|
#policies_show ⇒ Object
12
13
14
15
16
17
18
19
20
21
22
23
|
# File 'lib/manage/policies.rb', line 12
def policies_show
if policy = find_policy
count = FC::DB.query("SELECT count(*) as cnt FROM #{FC::Item.table_name} WHERE policy_id = #{policy.id}").first['cnt']
puts %Q{Policy
ID: #{policy.id}
Name: #{policy.name}
Create storages: #{policy.create_storages}
Copy storages: #{policy.copy_storages}
Copies: #{policy.copies}
Items: #{count}}
end
end
|
#run_global_daemon ⇒ Object
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/daemon.rb', line 18
def run_global_daemon
$log.debug('Run global daemon check')
timeout = FC::Var.get('daemon_global_wait_time', 120).to_i
r = FC::DB.query("SELECT #{FC::DB.prefix}vars.*, UNIX_TIMESTAMP() as curr_time FROM #{FC::DB.prefix}vars WHERE name='global_daemon_host'").first
if !r || r['curr_time'].to_i - r['time'].to_i > timeout
$log.debug('Set global daemon host to current')
FC::Var.set('global_daemon_host', FC::Storage.curr_host)
sleep 1
r = FC::DB.query("SELECT #{FC::DB.prefix}vars.*, UNIX_TIMESTAMP() as curr_time FROM #{FC::DB.prefix}vars WHERE name='global_daemon_host'").first
end
if r['val'] == FC::Storage.curr_host
if !$global_daemon_thread || !$global_daemon_thread.alive?
$log.debug("spawn GlobalDaemonThread")
$global_daemon_thread = GlobalDaemonThread.new(timeout)
end
else
if $global_daemon_thread
$log.warn("Kill global daemon thread (new host = #{r['host']})")
$global_daemon_thread.exit
end
end
end
|
#run_tasks ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# File 'lib/daemon.rb', line 88
def run_tasks
$log.debug('Run tasks')
$storages.each do |storage|
$tasks_threads[storage.name] = [] unless $tasks_threads[storage.name]
$tasks_threads[storage.name].delete_if {|thread| !thread.alive?}
tasks_count = $tasks[storage.name] ? $tasks[storage.name].size : 0
threads_count = $tasks_threads[storage.name].count
max_threads = FC::Var.get('daemon_global_tasks_threads_limit', 3).to_i
tasks_per_thread = FC::Var.get('daemon_global_tasks_per_thread', 10).to_i
run_threads_count = (tasks_count/tasks_per_thread.to_f).ceil - threads_count
run_threads_count = max_threads if run_threads_count > max_threads
$log.debug("tasks_count: #{tasks_count}, threads_count: #{threads_count}, run_threads_count: #{run_threads_count}")
run_threads_count.times do
$log.debug("spawn TaskThread for #{storage.name}")
$tasks_threads[storage.name] << TaskThread.new(storage.name)
end
end
end
|
#show_current_host ⇒ Object
1
2
3
|
# File 'lib/manage/show.rb', line 1
def show_current_host
puts "Current host: #{FC::Storage.curr_host}"
end
|
#show_errors ⇒ Object
14
15
16
17
18
19
20
21
22
23
24
|
# File 'lib/manage/show.rb', line 14
def show_errors
count = ARGV[2] || 10
errors = FC::Error.where("1 ORDER BY id desc LIMIT #{count.to_i}")
if errors.size == 0
puts "No errors."
else
errors.each do |error|
puts "#{Time.at(error.time)} item_id: #{error.item_id}, item_storage_id: #{error.item_storage_id}, host: #{error.host}, message: #{error.message}"
end
end
end
|
#show_global_daemon ⇒ Object
5
6
7
8
9
10
11
12
|
# File 'lib/manage/show.rb', line 5
def show_global_daemon
r = FC::DB.query("SELECT #{FC::DB.prefix}vars.*, UNIX_TIMESTAMP() as curr_time FROM #{FC::DB.prefix}vars WHERE name='global_daemon_host'").first
if r['val']
puts "Global daemon run on #{r['val']}\nLast run #{r['curr_time']-r['time']} seconds ago."
else
puts "Global daemon is not runnning."
end
end
|
#show_host_info ⇒ Object
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/manage/show.rb', line 26
def show_host_info
host = ARGV[2] || FC::Storage.curr_host
storages = FC::Storage.where("host = ?", host)
if storages.size == 0
puts "No storages."
else
puts "Info for host #{host}"
storages.each do |storage|
counts = FC::DB.query("SELECT status, count(*) as cnt FROM #{FC::ItemStorage.table_name} WHERE storage_name='#{Mysql2::Client.escape(storage.name)}' GROUP BY status")
str = "#{storage.name} #{size_to_human(storage.size)}/#{size_to_human(storage.size_limit)} "
str += "#{storage.up? ? colorize_string('UP', :green) : colorize_string('DOWN', :red)}"
str += " #{storage.check_time_delay} seconds ago" if storage.check_time
str += "\n"
counts.each do |r|
str += " Items storages #{r['status']}: #{r['cnt']}\n"
end
puts str
end
end
end
|
#show_items_info ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/manage/show.rb', line 47
def show_items_info
puts "Items by status:"
counts = FC::DB.query("SELECT status, count(*) as cnt FROM #{FC::Item.table_name} WHERE 1 GROUP BY status")
counts.each do |r|
puts " #{r['status']}: #{r['cnt']}"
end
puts "Items storages by status:"
counts = FC::DB.query("SELECT status, count(*) as cnt FROM #{FC::ItemStorage.table_name} WHERE 1 GROUP BY status")
counts.each do |r|
puts " #{r['status']}: #{r['cnt']}"
end
count = FC::DB.query("SELECT count(*) as cnt FROM #{FC::Item.table_name} as i, #{FC::Policy.table_name} as p WHERE i.policy_id = p.id AND i.copies > 0 AND i.copies < p.copies AND i.status = 'ready'").first['cnt']
puts "Items to copy: #{count}"
count = FC::DB.query("SELECT count(*) as cnt FROM #{FC::Item.table_name} as i, #{FC::Policy.table_name} as p WHERE i.policy_id = p.id AND i.copies > p.copies AND i.status = 'ready'").first['cnt']
puts "Items to delete: #{count}"
end
|
#size_to_human(size) ⇒ Object
25
26
27
28
29
30
31
|
# File 'lib/utils.rb', line 25
def size_to_human(size)
return "0" if size == 0
units = %w{B KB MB GB TB}
e = (Math.log(size)/Math.log(1024)).floor
s = "%.2f" % (size.to_f / 1024**e)
s.sub(/\.?0*$/, units[e])
end
|
#stdin_read_val(name, can_empty = false) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/utils.rb', line 43
def stdin_read_val(name, can_empty = false)
while val = Readline.readline("#{name}: ", false).strip.downcase
if val.empty? && !can_empty
puts "Input non empty #{name}."
else
if block_given?
if err = yield(val)
puts err
else
return val
end
else
return val
end
end
end
end
|
#storages_add ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/manage/storages.rb', line 32
def storages_add
host = FC::Storage.curr_host
puts "Add storage to host #{host}"
name = stdin_read_val('Name')
path = stdin_read_val('Path')
url = stdin_read_val('Url')
size_limit = human_to_size stdin_read_val('Size limit') {|val| "Size limit not is valid size." unless human_to_size(val)}
copy_id = stdin_read_val('Copy id').to_i
begin
path = path +'/' unless path[-1] == '/'
path = '/' + path unless path[0] == '/'
storage = FC::Storage.new(:name => name, :host => host, :path => path, :url => url, :size_limit => size_limit, :copy_id => copy_id)
print "Calc current size.. "
size = storage.file_size('')
puts "ok"
rescue Exception => e
puts "Error: #{e.message}"
exit
end
puts %Q{\nStorage
Name: #{name}
Host: #{host}
Path: #{path}
Url: #{url}
Size: #{size_to_human size}
Size limit: #{size_to_human size_limit}
Copy id: #{copy_id}}
s = Readline.readline("Continue? (y/n) ", false).strip.downcase
puts ""
if s == "y" || s == "yes"
storage.size = size
begin
storage.save
rescue Exception => e
puts "Error: #{e.message}"
exit
end
puts "ok"
else
puts "Canceled."
end
end
|
#storages_change ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
# File 'lib/manage/storages.rb', line 103
def storages_change
if storage = find_storage
puts "Change storage #{storage.name}"
host = stdin_read_val("Host (now #{storage.host})", true)
path = stdin_read_val("Path (now #{storage.path})", true)
url = stdin_read_val("Url (now #{storage.url})", true)
size_limit = stdin_read_val("Size (now #{size_to_human(storage.size_limit)})", true) {|val| "Size limit not is valid size." if !val.empty? && !human_to_size(val)}
copy_id = stdin_read_val("Copy id (now #{storage.copy_id})", true)
storage.host = host unless host.empty?
if !path.empty? && path != storage.path
path = path +'/' unless path[-1] == '/'
path = '/' + path unless path[0] == '/'
storage.path = path
print "Calc current size.. "
storage.size = storage.file_size('')
puts "ok"
end
storage.url = url unless url.empty?
storage.size_limit = human_to_size(size_limit) unless size_limit.empty?
storage.copy_id = copy_id.to_i
puts %Q{\nStorage
Name: #{storage.name}
Host: #{storage.host}
Path: #{storage.path}
Url: #{storage.url}
Size: #{size_to_human storage.size}
Size limit: #{size_to_human storage.size_limit}
Copy id: #{copy_id}}
s = Readline.readline("Continue? (y/n) ", false).strip.downcase
puts ""
if s == "y" || s == "yes"
begin
storage.save
rescue Exception => e
puts "Error: #{e.message}"
exit
end
puts "ok"
else
puts "Canceled."
end
end
end
|
#storages_check ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/daemon.rb', line 47
def storages_check
$log.debug('Run storages check')
$check_threads.each do |storage_name, thread|
if thread.alive?
error "Storage #{storage_name} check timeout"
thread.exit
end
end
$storages.each do|storage|
$log.debug("spawn CheckThread for #{storage.name}")
$check_threads[storage.name] = CheckThread.new(storage.name)
end
end
|
#storages_list ⇒ Object
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# File 'lib/manage/storages.rb', line 1
def storages_list
storages = FC::Storage.where("1 ORDER BY host")
if storages.size == 0
puts "No storages."
else
storages.each do |storage|
str = "#{colorize_string(storage.host, :yellow)} #{storage.name} #{size_to_human(storage.size)}/#{size_to_human(storage.size_limit)} "
str += "#{storage.up? ? colorize_string('UP', :green) : colorize_string('DOWN', :red)}"
str += " #{storage.check_time_delay} seconds ago" if storage.check_time
puts str
end
end
end
|
#storages_rm ⇒ Object
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/manage/storages.rb', line 75
def storages_rm
if storage = find_storage
s = Readline.readline("Continue? (y/n) ", false).strip.downcase
puts ""
if s == "y" || s == "yes"
storage.delete
puts "ok"
else
puts "Canceled."
end
end
end
|
#storages_show ⇒ Object
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# File 'lib/manage/storages.rb', line 15
def storages_show
if storage = find_storage
count = FC::DB.query("SELECT count(*) as cnt FROM #{FC::ItemStorage.table_name} WHERE storage_name='#{Mysql2::Client.escape(storage.name)}'").first['cnt']
puts %Q{Storage
Name: #{storage.name}
Host: #{storage.host}
Path: #{storage.path}
Url: #{storage.url}
Size: #{size_to_human storage.size}
Size limit: #{size_to_human storage.size_limit}
Copy id: #{storage.copy_id}
Check time: #{storage.check_time ? "#{Time.at(storage.check_time)} (#{storage.check_time_delay} seconds ago)" : ''}
Status: #{storage.up? ? colorize_string('UP', :green) : colorize_string('DOWN', :red)}
Items storages: #{count}}
end
end
|
#storages_sync ⇒ Object
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
# File 'lib/manage/storages.rb', line 156
def storages_sync
if storage = find_storage
print "Synchronize (#{storage.name}) storage and file system (#{storage.path})."
s = Readline.readline("Continue? (y/n) ", false).strip.downcase
puts ""
if s == "y" || s == "yes"
make_storages_sync(storage, true)
puts "Synchronize done."
storages_update_size
else
puts "Canceled."
end
end
end
|
#storages_update_size ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/manage/storages.rb', line 88
def storages_update_size
if storage = find_storage
print "Calc current size.. "
size = storage.file_size('')
storage.size = size
begin
storage.save
rescue Exception => e
puts "Error: #{e.message}"
exit
end
puts "ok"
end
end
|
#sync_info ⇒ Object
149
150
151
152
153
154
|
# File 'lib/manage/storages.rb', line 149
def sync_info
if storage = find_storage
print "Synchronization info for (#{storage.name}) storage and file system (#{storage.path})."
make_storages_sync(storage, false)
end
end
|
#update_storages ⇒ Object
41
42
43
44
45
|
# File 'lib/daemon.rb', line 41
def update_storages
$log.debug('Update storages')
$all_storages = FC::Storage.where
$storages = $all_storages.select{|s| s.host == FC::Storage.curr_host}
end
|
#update_tasks ⇒ Object
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/daemon.rb', line 61
def update_tasks
$log.debug('Update tasks')
return if $storages.length == 0
def check_tasks(type)
storages_names = $storages.map{|storage| "'#{storage.name}'"}.join(',')
cond = "storage_name in (#{storages_names}) AND status='#{type.to_s}'"
ids = $tasks.map{|storage_name, storage_tasks| storage_tasks.select{|task| task[:action] == type}}.
flatten.map{|task| task[:item_storage].id}
ids += $curr_tasks.select{|task| task[:action] == type}.map{|task| task[:item_storage].id}
limit = FC::Var.get('daemon_global_tasks_group_limit', 1000).to_i
cond << "AND id not in (#{ids.join(',')})" if (ids.length > 0)
cond << " LIMIT #{limit}"
FC::ItemStorage.where(cond).each do |item_storage|
unless ids.include?(item_storage.id)
$tasks[item_storage.storage_name] = [] unless $tasks[item_storage.storage_name]
$tasks[item_storage.storage_name] << {:action => type, :item_storage => item_storage}
$log.debug("task add: type=#{type}, item_storage=#{item_storage.id}")
end
end
end
check_tasks(:delete)
check_tasks(:copy)
end
|
#var_change ⇒ Object
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/manage/var.rb', line 21
def var_change
if var = find_var
puts "Change var #{var['name']}"
val = stdin_read_val("Value (now #{var['val']})")
puts %Q{\nVar
Name: #{var['name']}
Value: #{val}}
s = Readline.readline("Continue? (y/n) ", false).strip.downcase
puts ""
if s == "y" || s == "yes"
begin
FC::Var.set(var['name'], val)
rescue Exception => e
puts "Error: #{e.message}"
exit
end
puts "ok"
else
puts "Canceled."
end
end
end
|
#var_list ⇒ Object
1
2
3
4
5
6
7
8
9
10
|
# File 'lib/manage/var.rb', line 1
def var_list
vars = FC::DB.query("SELECT * FROM #{FC::DB.prefix}vars WHERE descr IS NOT NULL")
if vars.size == 0
puts "No vars."
else
vars.each do |var|
puts var['name']
end
end
end
|
#var_show ⇒ Object
12
13
14
15
16
17
18
19
|
# File 'lib/manage/var.rb', line 12
def var_show
if var = find_var
puts %Q{Var
Name: #{var['name']}
Value: #{var['val']}
Description: #{var['descr']}}
end
end
|