Class: Baykit::BayServer::Agent::GrandAgent
- Inherits:
-
Object
- Object
- Baykit::BayServer::Agent::GrandAgent
- Includes:
- Baykit::BayServer, Letters, Multiplexer, Signal, Docker, Taxi, Train, Util
- Defined in:
- lib/baykit/bayserver/agent/grand_agent.rb
Constant Summary collapse
- SELECT_TIMEOUT_SEC =
10
- CMD_OK =
0
- CMD_CLOSE =
1
- CMD_RELOAD_CERT =
2
- CMD_MEM_USAGE =
3
- CMD_SHUTDOWN =
4
- CMD_ABORT =
5
- CMD_CATCHUP =
6
Class Attribute Summary collapse
-
.agent_count ⇒ Object
readonly
Returns the value of attribute agent_count.
-
.agents ⇒ Object
readonly
Returns the value of attribute agents.
-
.listeners ⇒ Object
readonly
Returns the value of attribute listeners.
-
.max_agent_id ⇒ Object
readonly
Returns the value of attribute max_agent_id.
-
.max_ships ⇒ Object
readonly
Returns the value of attribute max_ships.
Instance Attribute Summary collapse
-
#aborted ⇒ Object
readonly
Returns the value of attribute aborted.
-
#agent_id ⇒ Object
readonly
Returns the value of attribute agent_id.
-
#anchorable ⇒ Object
readonly
Returns the value of attribute anchorable.
-
#command_receiver ⇒ Object
readonly
Returns the value of attribute command_receiver.
-
#job_multiplexer ⇒ Object
readonly
Returns the value of attribute job_multiplexer.
-
#last_timeout_check ⇒ Object
readonly
Returns the value of attribute last_timeout_check.
-
#letter_queue ⇒ Object
readonly
Returns the value of attribute letter_queue.
-
#letter_queue_lock ⇒ Object
readonly
Returns the value of attribute letter_queue_lock.
-
#max_inbound_ships ⇒ Object
readonly
Returns the value of attribute max_inbound_ships.
-
#net_multiplexer ⇒ Object
readonly
Returns the value of attribute net_multiplexer.
-
#postpone_queue ⇒ Object
readonly
Returns the value of attribute postpone_queue.
-
#postpone_queue_lock ⇒ Object
readonly
Returns the value of attribute postpone_queue_lock.
-
#recipient ⇒ Object
readonly
Returns the value of attribute recipient.
-
#send_wakeup_pipe ⇒ Object
readonly
Returns the value of attribute send_wakeup_pipe.
-
#spider_multiplexer ⇒ Object
readonly
Returns the value of attribute spider_multiplexer.
-
#spin_multiplexer ⇒ Object
readonly
Returns the value of attribute spin_multiplexer.
-
#taxi_multiplexer ⇒ Object
readonly
Returns the value of attribute taxi_multiplexer.
-
#timer_handlers ⇒ Object
readonly
Returns the value of attribute timer_handlers.
-
#unanchorable_transporters ⇒ Object
readonly
Returns the value of attribute unanchorable_transporters.
Instance Method Summary collapse
- #abort ⇒ Object
- #abort_agent ⇒ Object
- #add_command_receiver(rd) ⇒ Object
- #add_postpone(p) ⇒ Object
- #add_timer_handler(handler) ⇒ Object
- #catch_up ⇒ Object
- #count_postpone ⇒ Object
-
#initialize(agent_id, max_ships, anchorable) ⇒ GrandAgent
constructor
A new instance of GrandAgent.
- #inspect ⇒ Object
- #print_usage ⇒ Object
- #reload_cert ⇒ Object
- #remove_timer_handler(handler) ⇒ Object
- #req_catch_up ⇒ Object
- #req_shutdown ⇒ Object
- #run ⇒ Object
- #send_accepted_letter(st, client_rd, wakeup) ⇒ Object
- #send_closed_letter(st, wakeup) ⇒ Object
- #send_connected_letter(st, wakeup) ⇒ Object
- #send_error_letter(st, err, wakeup) ⇒ Object
- #send_read_letter(st, n, adr, wakeup) ⇒ Object
- #send_wrote_letter(st, n, wakeup) ⇒ Object
- #shutdown ⇒ Object
-
#start ⇒ Object
Custom methods.
- #to_s ⇒ Object
Constructor Details
#initialize(agent_id, max_ships, anchorable) ⇒ GrandAgent
Returns a new instance of GrandAgent.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 87 def initialize (agent_id, max_ships, anchorable) @agent_id = agent_id @max_inbound_ships = max_ships @anchorable = anchorable @timer_handlers = [] @select_timeout_sec = SELECT_TIMEOUT_SEC @aborted = false @letter_queue = [] @letter_queue_lock = Mutex.new @postpone_queue = [] @postpone_queue_lock = Mutex.new @spider_multiplexer = SpiderMultiplexer.new(self, anchorable) @spin_multiplexer = SpinMultiplexer.new(self) @job_multiplexer = JobMultiplexer.new(self, anchorable) @taxi_multiplexer = TaxiMultiplexer.new(self) case BayServer.harbor.recipient when Harbor::RECIPIENT_TYPE_SPIDER @recipient = @spider_multiplexer when Harbor::RECIPIENT_TYPE_PIPE raise NotImplementedError.new end case BayServer.harbor.net_multiplexer when Harbor::MULTIPLEXER_TYPE_SPIDER @net_multiplexer = @spider_multiplexer when Harbor::MULTIPLEXER_TYPE_JOB @net_multiplexer = @job_multiplexer when Harbor::MULTIPLEXER_TYPE_PIGEON, Harbor::MULTIPLEXER_TYPE_SPIN, Harbor::MULTIPLEXER_TYPE_TAXI, Harbor::MULTIPLEXER_TYPE_TRAIN raise Sink.new("Multiplexer not supported: %s", Harbor.get_multiplexer_type_name(BayServer.harbor.net_multiplexer)) end @last_timeout_check = 0 end |
Class Attribute Details
.agent_count ⇒ Object (readonly)
Returns the value of attribute agent_count.
72 73 74 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 72 def agent_count @agent_count end |
.agents ⇒ Object (readonly)
Returns the value of attribute agents.
70 71 72 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 70 def agents @agents end |
.listeners ⇒ Object (readonly)
Returns the value of attribute listeners.
71 72 73 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 71 def listeners @listeners end |
.max_agent_id ⇒ Object (readonly)
Returns the value of attribute max_agent_id.
74 75 76 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 74 def max_agent_id @max_agent_id end |
.max_ships ⇒ Object (readonly)
Returns the value of attribute max_ships.
73 74 75 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 73 def max_ships @max_ships end |
Instance Attribute Details
#aborted ⇒ Object (readonly)
Returns the value of attribute aborted.
60 61 62 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 60 def aborted @aborted end |
#agent_id ⇒ Object (readonly)
Returns the value of attribute agent_id.
46 47 48 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 46 def agent_id @agent_id end |
#anchorable ⇒ Object (readonly)
Returns the value of attribute anchorable.
47 48 49 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 47 def anchorable @anchorable end |
#command_receiver ⇒ Object (readonly)
Returns the value of attribute command_receiver.
61 62 63 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 61 def command_receiver @command_receiver end |
#job_multiplexer ⇒ Object (readonly)
Returns the value of attribute job_multiplexer.
49 50 51 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 49 def job_multiplexer @job_multiplexer end |
#last_timeout_check ⇒ Object (readonly)
Returns the value of attribute last_timeout_check.
63 64 65 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 63 def last_timeout_check @last_timeout_check end |
#letter_queue ⇒ Object (readonly)
Returns the value of attribute letter_queue.
64 65 66 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 64 def letter_queue @letter_queue end |
#letter_queue_lock ⇒ Object (readonly)
Returns the value of attribute letter_queue_lock.
65 66 67 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 65 def letter_queue_lock @letter_queue_lock end |
#max_inbound_ships ⇒ Object (readonly)
Returns the value of attribute max_inbound_ships.
58 59 60 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 58 def max_inbound_ships @max_inbound_ships end |
#net_multiplexer ⇒ Object (readonly)
Returns the value of attribute net_multiplexer.
48 49 50 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 48 def net_multiplexer @net_multiplexer end |
#postpone_queue ⇒ Object (readonly)
Returns the value of attribute postpone_queue.
66 67 68 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 66 def postpone_queue @postpone_queue end |
#postpone_queue_lock ⇒ Object (readonly)
Returns the value of attribute postpone_queue_lock.
67 68 69 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 67 def postpone_queue_lock @postpone_queue_lock end |
#recipient ⇒ Object (readonly)
Returns the value of attribute recipient.
55 56 57 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 55 def recipient @recipient end |
#send_wakeup_pipe ⇒ Object (readonly)
Returns the value of attribute send_wakeup_pipe.
57 58 59 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 57 def send_wakeup_pipe @send_wakeup_pipe end |
#spider_multiplexer ⇒ Object (readonly)
Returns the value of attribute spider_multiplexer.
52 53 54 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 52 def spider_multiplexer @spider_multiplexer end |
#spin_multiplexer ⇒ Object (readonly)
Returns the value of attribute spin_multiplexer.
51 52 53 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 51 def spin_multiplexer @spin_multiplexer end |
#taxi_multiplexer ⇒ Object (readonly)
Returns the value of attribute taxi_multiplexer.
50 51 52 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 50 def taxi_multiplexer @taxi_multiplexer end |
#timer_handlers ⇒ Object (readonly)
Returns the value of attribute timer_handlers.
62 63 64 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 62 def timer_handlers @timer_handlers end |
#unanchorable_transporters ⇒ Object (readonly)
Returns the value of attribute unanchorable_transporters.
59 60 61 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 59 def unanchorable_transporters @unanchorable_transporters end |
Instance Method Details
#abort ⇒ Object
339 340 341 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 339 def abort BayLog.fatal("%s abort", self) end |
#abort_agent ⇒ Object
249 250 251 252 253 254 255 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 249 def abort_agent BayLog.info("%s abort", self) if BayServer.harbor.multi_core exit(1) end end |
#add_command_receiver(rd) ⇒ Object
285 286 287 288 289 290 291 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 285 def add_command_receiver(rd) @command_receiver = CommandReceiver.new() com_transporter = PlainTransporter.new(@net_multiplexer, @command_receiver, true, 8, false) @command_receiver.init(@agent_id, rd, com_transporter) @net_multiplexer.add_rudder_state(@command_receiver.rudder, RudderState.new(@command_receiver.rudder, com_transporter)) BayLog.info("ComRec=%s", @command_receiver) end |
#add_postpone(p) ⇒ Object
356 357 358 359 360 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 356 def add_postpone(p) @postpone_queue_lock.synchronize do @postpone_queue << p end end |
#add_timer_handler(handler) ⇒ Object
277 278 279 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 277 def add_timer_handler(handler) @timer_handlers << handler end |
#catch_up ⇒ Object
380 381 382 383 384 385 386 387 388 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 380 def catch_up BayLog.debug("%s catchUp", self) @postpone_queue_lock.synchronize do if not @postpone_queue.empty? r = @postpone_queue.shift r.run() end end end |
#count_postpone ⇒ Object
362 363 364 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 362 def count_postpone return @postpone_queue.length end |
#inspect ⇒ Object
134 135 136 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 134 def inspect return to_s end |
#print_usage ⇒ Object
263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 263 def print_usage # print memory usage BayLog.info("%s MemUsage", self) BayLog.info(" Ruby version: %s", RUBY_VERSION) memsize = ObjectSpace.memsize_of_all # formatted by comma msize_comma = memsize.to_s.reverse.gsub(/(\d{3})(?=\d)/, '\\1,').reverse.then do |str| str[0] == ',' ? str[1..-1] : str end BayLog.info(" Total object size: %s bytes", msize_comma) MemUsage.get(@agent_id).print_usage(1) end |
#reload_cert ⇒ Object
344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 344 def reload_cert GrandAgent.anchorable_port_map.values().each do |port| if port.secure() begin port.secure_docker.reload_cert() rescue => e BayLog.error_e(e) end end end end |
#remove_timer_handler(handler) ⇒ Object
281 282 283 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 281 def remove_timer_handler(handler) @timer_handlers.delete(handler) end |
#req_catch_up ⇒ Object
366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 366 def req_catch_up BayLog.debug("%s Req catchUp", self) if count_postpone > 0 catch_up else begin @command_receiver.send_command_to_monitor(self, CMD_CATCHUP, false) rescue IOError => e BayLog.error_e(e) abort end end end |
#req_shutdown ⇒ Object
257 258 259 260 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 257 def req_shutdown @aborted = true @recipient.wakeup end |
#run ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 147 def run BayLog.info(BayMessage.get(:MSG_RUNNING_GRAND_AGENT, self)) if @net_multiplexer.is_non_blocking BayLog.info("rec=%s", @command_receiver) @command_receiver.rudder.set_non_blocking end @net_multiplexer.req_read(@command_receiver.rudder) if @anchorable # Adds server socket channel of anchorable ports BayServer.anchorable_port_map.keys.each do |rd| if @net_multiplexer.is_non_blocking rd.set_non_blocking end @net_multiplexer.add_rudder_state(rd, RudderState.new(rd)) end end # Set up unanchorable channel =begin for ch in GrandAgent.unanchorable_port_map.keys() do port_dkr = GrandAgent.unanchorable_port_map[ch] tp = port_dkr.new_transporter(self, ch) @unanchorable_transporters[ch] = tp @non_blocking_handler.add_channel_listener(ch, tp) @non_blocking_handler.ask_to_start(ch) if !@anchorable @non_blocking_handler.ask_to_read(ch) end end =end busy = true begin while true test_busy = @net_multiplexer.is_busy if test_busy != busy busy = test_busy if busy @net_multiplexer.on_busy else @net_multiplexer.on_free end end if not @spin_multiplexer.is_empty # If "SpinHandler" is running, the select function does not block. received = @recipient.receive(false) @spin_multiplexer.process_data else received = @recipient.receive(true) end if @aborted BayLog.info("%s aborted by another thread", self) break; end if @spin_multiplexer.is_empty && @letter_queue.empty? # timed out # check per 10 seconds if Time.now.tv_sec - @last_timeout_check >= 10 ring end end while !@letter_queue.empty? let = nil @letter_queue_lock.synchronize do let = @letter_queue.shift end case let when AcceptedLetter on_accepted(let) when ConnectedLetter on_connected(let) when ReadLetter on_read(let) when WroteLetter on_wrote(let) when ClosedLetter on_closed(let) when ErrorLetter on_error(let) end end end # while rescue Exception => e BayLog.fatal_e(e, "Uncaught Error: %s", e) ensure BayLog.info("%s end", self) shutdown end end |
#send_accepted_letter(st, client_rd, wakeup) ⇒ Object
293 294 295 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 293 def send_accepted_letter(st, client_rd, wakeup) send_letter(AcceptedLetter.new(st, client_rd), wakeup) end |
#send_closed_letter(st, wakeup) ⇒ Object
308 309 310 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 308 def send_closed_letter(st, wakeup) send_letter(ClosedLetter.new(st), wakeup) end |
#send_connected_letter(st, wakeup) ⇒ Object
297 298 299 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 297 def send_connected_letter(st, wakeup) send_letter(ConnectedLetter.new(st), wakeup) end |
#send_error_letter(st, err, wakeup) ⇒ Object
312 313 314 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 312 def send_error_letter(st, err, wakeup) send_letter(ErrorLetter.new(st, err), wakeup) end |
#send_read_letter(st, n, adr, wakeup) ⇒ Object
300 301 302 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 300 def send_read_letter(st, n, adr, wakeup) send_letter(ReadLetter.new(st, n, adr), wakeup) end |
#send_wrote_letter(st, n, wakeup) ⇒ Object
304 305 306 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 304 def send_wrote_letter(st, n, wakeup) send_letter(WroteLetter.new(st, n), wakeup) end |
#shutdown ⇒ Object
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 316 def shutdown BayLog.info("%s shutdown", self) if @aborted return end @aborted = true BayLog.debug("%s shutdown netMultiplexer", self) @net_multiplexer.shutdown() GrandAgent.listeners.each do |lis| lis.remove(@agent_id) end @command_receiver.end() GrandAgent.agents.delete(@agent_id) if BayServer.harbor.multi_core BayLog.debug("%s exit", self) exit(1) end @agent_id = -1 end |
#start ⇒ Object
Custom methods
141 142 143 144 145 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 141 def start Thread.new do run end end |
#to_s ⇒ Object
129 130 131 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 129 def to_s() return "agt#" + @agent_id.to_s end |