Module: ContentServer
- Defined in:
- lib/content_server/backup_server.rb,
lib/content_server.rb,
lib/content_server/server.rb,
lib/content_server/version.rb,
lib/content_server/queue_copy.rb,
lib/content_server/file_streamer.rb,
lib/content_server/content_server.rb,
lib/content_server/remote_content.rb,
lib/content_server/content_receiver.rb
Overview
Content server. Monitors files, index local files, listen to backup server content, copy changes and new files to backup server.
Defined Under Namespace
Classes: ContentDataReceiver, ContentDataSender, FileCopyClient, FileCopyManager, FileCopyServer, FileReceiver, FileStreamer, RemoteContentClient, RemoteContentServer, Stream
Constant Summary
collapse
- VERSION =
"1.7.2"
Class Method Summary
collapse
Class Method Details
.flush_content_data ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/content_server/server.rb', line 92
def flush_content_data
Log.debug1('Start flush local content data to file.')
$testing_memory_log.info('Start flush content data to file') if $testing_memory_active
$local_content_data_lock.synchronize{
local_content_data_unique_id = $local_content_data.unique_id
if (local_content_data_unique_id != $last_content_data_id)
$last_content_data_id = local_content_data_unique_id
$local_content_data.to_file($tmp_content_data_file)
File.rename($tmp_content_data_file, Params['local_content_data_path'])
Log.debug1('End flush local content data to file.')
$testing_memory_log.info('End flush content data to file') if $testing_memory_active
else
Log.debug1('no need to flush. content data has not changed')
$testing_memory_log.info('no need to flush. content data has not changed') if $testing_memory_active
end
}
end
|
.handle_program_termination(exception) ⇒ Object
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/content_server/server.rb', line 37
def handle_program_termination(exception)
message = "\nInterrupt or Exit happened in server:''.\n" +
"Exception type:'#{exception.class}'.\n" +
"Exception message:'#{exception.message}'.\n" +
"Stopping process.\n" +
"Backtrace:\n#{exception.backtrace.join("\n")}"
puts(message)
Log.error(message)
end
|
.init_globals ⇒ Object
using module method fot globals initialization due to the use of ‘Params’.
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/content_server/server.rb', line 24
def init_globals
$process_vars = ThreadSafeHash::ThreadSafeHashMonitored.new(Params['enable_monitoring'])
$tmp_content_data_file = nil $testing_memory_active = false
$testing_memory_log = nil
$indexed_file_count = 0
$local_content_data = nil
$local_content_data_lock = nil
$remote_content_data_lock = nil
$remote_content_data = nil
$last_content_data_id = nil
end
|
.monitor_general_process_vars ⇒ Object
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/content_server/server.rb', line 58
def monitor_general_process_vars
objects_counters = {}
objects_counters["Time"] = Time.now.to_i
while true do
current_objects_counters = {}
sleep(Params['process_vars_delay'])
time = Time.now
$process_vars.set('time', time)
current_objects_counters['Time'] = time.to_i
count = ObjectSpace.each_object(String).count
$process_vars.set('String count', count)
current_objects_counters['String'] = count
count = ObjectSpace.each_object(ContentData::ContentData).count
$process_vars.set('ContentData count', count)
current_objects_counters['ContentData'] = count
dir_count = ObjectSpace.each_object(FileMonitoring::DirStat).count
$process_vars.set('DirStat count', dir_count)
current_objects_counters['DirStat'] = dir_count
file_count = ObjectSpace.each_object(FileMonitoring::FileStat).count
$process_vars.set('FileStat count', file_count-dir_count)
current_objects_counters['FileStat'] = file_count
report = ""
current_objects_counters.each_key { |type|
objects_counters[type] = 0 unless objects_counters[type]
diff = current_objects_counters[type] - objects_counters[type]
report += "Type:#{type} raised in:#{diff} \n"
objects_counters[type] = current_objects_counters[type]
}
Log.info("MEM REPORT:\n%s\n", report)
end
end
|
.run_backup_server ⇒ Object
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
# File 'lib/content_server/backup_server.rb', line 30
def run_backup_server
Log.info('Start backup server')
Thread.abort_on_exception = true
all_threads = []
FileUtils.mkdir_p(Params['tmp_path']) unless File.directory?(Params['tmp_path'])
$tmp_content_data_file = File.join(Params['tmp_path'], 'backup.data')
if Params['enable_monitoring']
Log.info("Initializing monitoring of process params on port:%s", Params['process_monitoring_web_port'])
$process_vars.set('server_name', 'backup_server')
end
Params['backup_destination_folder'][0]['path']=File.expand_path(Params['backup_destination_folder'][0]['path'])
Log.info("backup_destination_folder is:%s", Params['backup_destination_folder'][0]['path'])
Params['monitoring_paths'] << Params['backup_destination_folder'][0]
Log.info('Start monitoring following directories:')
Params['monitoring_paths'].each { |path|
Log.info(" Path:'%s'", path['path'])
}
$local_content_data_lock = Mutex.new
$local_content_data = ContentData::ContentData.new
$last_content_data_id = $local_content_data.unique_id
content_data_path = Params['local_content_data_path']
if File.exists?(content_data_path) and !File.directory?(content_data_path)
Log.info("reading initial content data that exist from previous system run from file:%s", content_data_path)
$local_content_data.from_file(content_data_path)
$last_content_data_id = $local_content_data.unique_id
else
if File.directory?(content_data_path)
raise("Param:'local_content_data_path':'%s' cannot be a directory name", Params['local_content_data_path'])
end
dir = File.dirname(Params['local_content_data_path'])
FileUtils.mkdir_p(dir) unless File.exists?(dir)
end
Log.info("Init monitoring")
fm = FileMonitoring::FileMonitoring.new()
all_threads << Thread.new do
fm.monitor_files
end
Log.debug1('Init thread: flush local content data to file')
all_threads << Thread.new do
FileUtils.mkdir_p(Params['tmp_path']) unless File.directory?(Params['tmp_path'])
loop{
sleep(Params['data_flush_delay'])
ContentServer.flush_content_data
}
end
$remote_content_data_lock = Mutex.new
$remote_content_data = ContentData::ContentData.new
remote_content = ContentServer::RemoteContentClient.new(Params['content_server_hostname'],
Params['content_server_data_port'],
Params['backup_destination_folder'][0]['path'])
all_threads.concat(remote_content.run())
file_copy_client = FileCopyClient.new(Params['content_server_hostname'],
Params['content_server_files_port'])
all_threads.concat(file_copy_client.threads)
Log.info('Start remote and local contents comparator')
all_threads << Thread.new do
loop do
sleep(Params['backup_check_delay'])
$local_content_data_lock.synchronize{
$remote_content_data_lock.synchronize{
diff = ContentData.remove($local_content_data, $remote_content_data)
unless diff.nil? || diff.empty?
Log.debug2("Backup content:\n%s", $local_content_data)
Log.debug2("Remote content:\n%s", $remote_content_data)
Log.debug2("Missing contents:\n%s", diff)
Log.info('Start sync check. Backup and remote contents need a sync, requesting copy files:')
file_copy_client.request_copy(diff)
else
Log.info("Start sync check. Local and remote contents are equal. No sync required.")
end
diff = ContentData::ContentData.new }
}
end
end
if Params['enable_monitoring']
monitoring_info = MonitoringInfo::MonitoringInfo.new()
all_threads << Thread.new do
ContentServer.monitor_general_process_vars
end
end
all_threads.each { |t| t.abort_on_exception = true }
all_threads.each { |t| t.join }
end
|
.run_content_server ⇒ Object
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/content_server/content_server.rb', line 24
def run_content_server
Log.info('Content server start')
all_threads = []
FileUtils.mkdir_p(Params['tmp_path']) unless File.directory?(Params['tmp_path'])
$tmp_content_data_file = File.join(Params['tmp_path'], 'content.data')
if Params['enable_monitoring']
Log.info("Initializing monitoring of process params on port:%s", Params['process_monitoring_web_port'])
$process_vars.set('server_name', 'content_server')
end
Log.info('Start monitoring following directories:')
Params['monitoring_paths'].each {|path|
Log.info(" Path:'%s'", path['path'])
}
$local_content_data_lock = Mutex.new
$local_content_data = ContentData::ContentData.new
$last_content_data_id = $local_content_data.unique_id
content_data_path = Params['local_content_data_path']
if File.exists?(content_data_path) and !File.directory?(content_data_path)
Log.info("reading initial content data that exist from previous system run from file:%s", content_data_path)
$local_content_data.from_file(content_data_path)
$last_content_data_id = $local_content_data.unique_id
else
if File.directory?(content_data_path)
raise("Param:'local_content_data_path':'%s'cannot be a directory name", Params['local_content_data_path'])
end
dir = File.dirname(Params['local_content_data_path'])
FileUtils.mkdir_p(dir) unless File.exists?(dir)
end
Log.info('Init monitoring')
fm = FileMonitoring::FileMonitoring.new()
all_threads << Thread.new do
fm.monitor_files
end
Log.debug1('Init thread: flush local content data to file')
all_threads << Thread.new do
FileUtils.mkdir_p(Params['tmp_path']) unless File.directory?(Params['tmp_path'])
loop{
sleep(Params['data_flush_delay'])
ContentServer.flush_content_data
}
end
remote_content_client = RemoteContentServer.new(Params['local_content_data_port'])
all_threads << remote_content_client.tcp_thread
Log.debug1('Start copy data on demand')
copy_files_events = Queue.new copy_server = FileCopyServer.new(copy_files_events, Params['local_files_port'])
all_threads.concat(copy_server.run())
if Params['enable_monitoring']
monitoring_info = MonitoringInfo::MonitoringInfo.new()
all_threads << Thread.new do
ContentServer.monitor_general_process_vars
end
end
all_threads.each { |t| t.abort_on_exception = true }
all_threads.each { |t| t.join }
end
|