56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
# File 'lib/cpee/implementation.rb', line 56
def self::implementation(opts)
opts[:see_instances] ||= opts[:see_instances].nil? ? false : opts[:see_instances]
opts[:instances] ||= File.expand_path(File.join(__dir__,'..','..','server','instances'))
opts[:global_executionhandlers] ||= File.expand_path(File.join(__dir__,'..','..','server','executionhandlers'))
opts[:executionhandlers] ||= ''
opts[:topics] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','topics.xml'))
opts[:properties_init] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','properties.init'))
opts[:properties_empty] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','properties.empty'))
opts[:transformation_service] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','transformation.xml'))
opts[:empty_dslx] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','empty_dslx.xml'))
opts[:notifications_init] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','notifications'))
opts[:states] ||= File.expand_path(File.join(__dir__,'..','..','server','resources','states.xml'))
opts[:watchdog_frequency] ||= 7
opts[:watchdog_start_off] ||= false
opts[:infinite_loop_stop] ||= 10000
opts[:workers] ||= 1
opts[:workers_single] ||= ['end','persist','forward-votes']
opts[:workers_multi] ||= ['forward-events']
opts[:dashing_frequency] ||= 3
opts[:dashing_target] ||= nil
opts[:redis_path] ||= 'redis.sock'
opts[:redis_db] ||= 0
opts[:redis_url] ||= nil
opts[:redis_cmd] ||= 'redis-server --port 0 --unixsocket #redis_path# --unixsocketperm 600 --pidfile #redis_pid# --dir #redis_db_dir# --dbfilename #redis_db_name# --databases 1 --save 900 1 --save 300 10 --save 60 10000 --rdbcompression yes --daemonize yes'
opts[:redis_pid] ||= 'redis.pid'
opts[:redis_db_name] ||= 'redis.rdb'
CPEE::redis_connect opts, 'Server Main'
opts[:sse_keepalive_frequency] ||= 10
opts[:sse_connections] = {}
opts[:statemachine] = CPEE::StateMachine.new opts[:states], %w{running simulating replaying finishing stopping abandoned finished} do |id|
CPEE::Persistence::(id,opts,"state")
end
opts[:runtime_cmds] << [
"startclean", "Delete instances before starting.", Proc.new { |status|
Dir.glob(File.expand_path(File.join(opts[:instances],'*'))).each do |d|
FileUtils.rm_r(d) if File.basename(d) =~ /^\d+$/
end
}
]
Proc.new do
Dir[File.join(opts[:global_executionhandlers],'*','execution.rb')].each do |h|
require h
end unless opts[:global_executionhandlers].nil? || opts[:global_executionhandlers].strip == ''
Dir[File.join(opts[:executionhandlers],'*','execution.rb')].each do |h|
require h
end unless opts[:executionhandlers].nil? || opts[:executionhandlers].strip == ''
CPEE::Message::set_workers(opts[:workers])
parallel do
CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db],opts[:workers],opts[:workers_single],opts[:workers_multi])
EM.add_periodic_timer(opts[:watchdog_frequency]) do
CPEE::watch_services(opts[:watchdog_start_off],opts[:redis_url],File.join(opts[:basepath],opts[:redis_path]),opts[:redis_db],opts[:workers],opts[:workers_single],opts[:workers_multi])
end
EM.defer do
CPEE::Notifications::sse_distributor(opts)
end
EM.add_periodic_timer(opts[:sse_keepalive_frequency]) do
CPEE::Notifications::sse_heartbeat(opts)
end
if opts[:dashing_target]
cpu_last = 0
idl_last = 0
EM.add_periodic_timer(opts[:dashing_frequency]) do
src = `cat /proc/stat | head -n 1`.split("\n")
srm = `cat /proc/meminfo`.split("\n")
sc = {}
sm = {}
src.each do |e|
x = e.split(' ')
sc[x[0]] = x[1..-1].map{|r| r.to_i}
end
srm.each do |e|
x = e.split(/\s+/)
sm[x[0].chop] = x[1].to_i
end
scc = 0
sci = 0
sc.each do |_,e|
scc = e[0..4].sum
sci = e[3]
end
cpu_delta = scc - cpu_last
cpu_idle = sci - idl_last
cpu_used = cpu_delta - cpu_idle
cpu_usage = '%.2f' % (100 * cpu_used / cpu_delta.to_f)
mem_tot = '%.1f' % (sm['MemTotal']/1024.0)
mem_fre = '%.1f' % (sm['MemFree']/1024.0)
mem_ava = '%.1f' % (sm['MemAvailable']/1024.0)
mem_buc = '%.1f' % ((sm['Buffers'] + sm['Cached'] + sm['SReclaimable'])/1024.0)
mem_usd = '%.1f' % ((sm['MemTotal'] - sm['MemFree'] - sm['Buffers'] - sm['Cached'] - sm['SReclaimable'])/1024.0)
content = {}
content['cpu_usage'] = cpu_usage
content['mem_total'] = mem_tot
content['mem_free'] = mem_fre
content['mem_available'] = mem_ava
content['mem_bufferedandcached'] = mem_buc
content['mem_used'] = mem_usd
CPEE::Message::send_url(:event,'node/resource_utilization',File.join(opts[:url],'/'),content,File.join(opts[:dashing_target],'/dash/events'))
idl_last = sci
cpu_last = scc
end
end
end
cleanup do
CPEE::cleanup_services(opts[:watchdog_start_off])
end
interface 'main' do
run CPEE::Instances, opts if get '*'
run CPEE::NewInstance, opts if post 'instance-new'
on resource '\d+' do |r|
run CPEE::Info, opts if get
run CPEE::DeleteInstance, opts if delete
end
end
interface 'properties' do |r|
id = r[:h]['RIDDL_DECLARATION_PATH'].split('/')[1].to_i
use CPEE::Properties::implementation(id.to_i, opts)
end
interface 'notifications' do |r|
id = r[:h]['RIDDL_DECLARATION_PATH'].split('/')[1].to_i
use CPEE::Notifications::implementation(id.to_i, opts)
end
interface 'callbacks' do |r|
id = r[:h]['RIDDL_DECLARATION_PATH'].split('/')[1].to_i
use CPEE::Callbacks::implementation(id.to_i, opts)
end
end
end
|