24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/content_server/content_server.rb', line 24
def run_content_server
Log.info('Content server start')
all_threads = []
FileUtils.mkdir_p(Params['tmp_path']) unless File.directory?(Params['tmp_path'])
$tmp_content_data_file = File.join(Params['tmp_path'], 'content.data')
if Params['enable_monitoring']
Log.info("Initializing monitoring of process params on port:%s", Params['process_monitoring_web_port'])
$process_vars.set('server_name', 'content_server')
end
Log.info('Start monitoring following directories:')
Params['monitoring_paths'].each {|path|
Log.info(" Path:'%s'", path['path'])
}
$local_content_data_lock = Mutex.new
$local_content_data = ContentData::ContentData.new
$last_content_data_id = $local_content_data.unique_id
content_data_path = Params['local_content_data_path']
if File.exists?(content_data_path) and !File.directory?(content_data_path)
Log.info("reading initial content data that exist from previous system run from file:%s", content_data_path)
$local_content_data.from_file(content_data_path)
$last_content_data_id = $local_content_data.unique_id
else
if File.directory?(content_data_path)
raise("Param:'local_content_data_path':'%s'cannot be a directory name", Params['local_content_data_path'])
end
dir = File.dirname(Params['local_content_data_path'])
FileUtils.mkdir_p(dir) unless File.exists?(dir)
end
Log.info('Init monitoring')
fm = FileMonitoring::FileMonitoring.new()
all_threads << Thread.new do
fm.monitor_files
end
Log.debug1('Init thread: flush local content data to file')
all_threads << Thread.new do
FileUtils.mkdir_p(Params['tmp_path']) unless File.directory?(Params['tmp_path'])
loop{
sleep(Params['data_flush_delay'])
ContentServer.flush_content_data
}
end
remote_content_client = RemoteContentServer.new(Params['local_content_data_port'])
all_threads << remote_content_client.tcp_thread
Log.debug1('Start copy data on demand')
copy_files_events = Queue.new
copy_server = FileCopyServer.new(copy_files_events, Params['local_files_port'])
all_threads.concat(copy_server.run())
if Params['enable_monitoring']
monitoring_info = MonitoringInfo::MonitoringInfo.new()
all_threads << Thread.new do
ContentServer.monitor_general_process_vars
end
end
all_threads.each { |t| t.abort_on_exception = true }
all_threads.each { |t| t.join }
end
|