Class: Baykit::BayServer::Agent::GrandAgent
- Inherits:
-
Object
- Object
- Baykit::BayServer::Agent::GrandAgent
- Includes:
- Baykit::BayServer, Letters, Multiplexer, Signal, Common, Docker, Taxi, Train, Util
- Defined in:
- lib/baykit/bayserver/agent/grand_agent.rb
Constant Summary collapse
- SELECT_TIMEOUT_SEC =
10- CMD_OK =
0- CMD_CLOSE =
1- CMD_RELOAD_CERT =
2- CMD_MEM_USAGE =
3- CMD_SHUTDOWN =
4- CMD_ABORT =
5- CMD_CATCHUP =
6
Class Attribute Summary collapse
-
.agent_count ⇒ Object
readonly
Returns the value of attribute agent_count.
-
.agents ⇒ Object
readonly
Returns the value of attribute agents.
-
.listeners ⇒ Object
readonly
Returns the value of attribute listeners.
-
.max_agent_id ⇒ Object
readonly
Returns the value of attribute max_agent_id.
-
.max_ships ⇒ Object
readonly
Returns the value of attribute max_ships.
Instance Attribute Summary collapse
-
#aborted ⇒ Object
readonly
Returns the value of attribute aborted.
-
#agent_id ⇒ Object
readonly
Returns the value of attribute agent_id.
-
#anchorable ⇒ Object
readonly
Returns the value of attribute anchorable.
-
#command_receiver ⇒ Object
readonly
Returns the value of attribute command_receiver.
-
#job_multiplexer ⇒ Object
readonly
Returns the value of attribute job_multiplexer.
-
#last_timeout_check ⇒ Object
readonly
Returns the value of attribute last_timeout_check.
-
#letter_queue ⇒ Object
readonly
Returns the value of attribute letter_queue.
-
#letter_queue_lock ⇒ Object
readonly
Returns the value of attribute letter_queue_lock.
-
#max_inbound_ships ⇒ Object
readonly
Returns the value of attribute max_inbound_ships.
-
#net_multiplexer ⇒ Object
readonly
Returns the value of attribute net_multiplexer.
-
#postpone_queue ⇒ Object
readonly
Returns the value of attribute postpone_queue.
-
#postpone_queue_lock ⇒ Object
readonly
Returns the value of attribute postpone_queue_lock.
-
#recipient ⇒ Object
readonly
Returns the value of attribute recipient.
-
#spider_multiplexer ⇒ Object
readonly
Returns the value of attribute spider_multiplexer.
-
#spin_multiplexer ⇒ Object
readonly
Returns the value of attribute spin_multiplexer.
-
#taxi_multiplexer ⇒ Object
readonly
Returns the value of attribute taxi_multiplexer.
-
#timer_handlers ⇒ Object
readonly
Returns the value of attribute timer_handlers.
Class Method Summary collapse
- .add(agt_id, anchorable) ⇒ Object
- .add_lifecycle_listener(lis) ⇒ Object
- .get(agt_id) ⇒ Object
-
.init(agt_ids, max_ships) ⇒ Object
class methods.
Instance Method Summary collapse
- #abort ⇒ Object
- #abort_agent ⇒ Object
- #add_command_receiver(rd) ⇒ Object
- #add_postpone(p) ⇒ Object
- #add_timer_handler(handler) ⇒ Object
- #catch_up ⇒ Object
-
#initialize(agent_id, max_ships, anchorable) ⇒ GrandAgent
constructor
A new instance of GrandAgent.
- #inspect ⇒ Object
- #print_usage ⇒ Object
- #reload_cert ⇒ Object
- #remove_timer_handler(handler) ⇒ Object
- #req_catch_up ⇒ Object
- #req_shutdown ⇒ Object
- #run ⇒ Object
- #send_accepted_letter(state_id, rd, mpx, client_rd, wakeup) ⇒ Object
- #send_closed_letter(state_id, rd, mpx, wakeup) ⇒ Object
- #send_connected_letter(state_id, rd, mpx, wakeup) ⇒ Object
- #send_error_letter(state_id, rd, mpx, err, wakeup) ⇒ Object
- #send_read_letter(state_id, rd, mpx, n, adr, wakeup) ⇒ Object
- #send_wrote_letter(state_id, rd, mpx, n, wakeup) ⇒ Object
- #shutdown ⇒ Object
-
#start ⇒ Object
Custom methods.
- #to_s ⇒ Object
Constructor Details
#initialize(agent_id, max_ships, anchorable) ⇒ GrandAgent
Returns a new instance of GrandAgent.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 86 def initialize (agent_id, max_ships, anchorable) @agent_id = agent_id @max_inbound_ships = max_ships @anchorable = anchorable @timer_handlers = [] @select_timeout_sec = SELECT_TIMEOUT_SEC @aborted = false @letter_queue = [] @letter_queue_lock = Mutex.new @postpone_queue = [] @postpone_queue_lock = Mutex.new @spider_multiplexer = SpiderMultiplexer.new(self, anchorable) @spin_multiplexer = SpinMultiplexer.new(self) @job_multiplexer = JobMultiplexer.new(self, anchorable) @taxi_multiplexer = TaxiMultiplexer.new(self) case BayServer.harbor.recipient when Harbor::RECIPIENT_TYPE_SPIDER @recipient = @spider_multiplexer when Harbor::RECIPIENT_TYPE_PIPE raise NotImplementedError.new end case BayServer.harbor.net_multiplexer when Harbor::MULTIPLEXER_TYPE_SPIDER @net_multiplexer = @spider_multiplexer when Harbor::MULTIPLEXER_TYPE_JOB @net_multiplexer = @job_multiplexer when Harbor::MULTIPLEXER_TYPE_PIGEON, Harbor::MULTIPLEXER_TYPE_SPIN, Harbor::MULTIPLEXER_TYPE_TAXI, Harbor::MULTIPLEXER_TYPE_TRAIN raise Sink.new("Multiplexer not supported: %s", Harbor.get_multiplexer_type_name(BayServer.harbor.net_multiplexer)) end @last_timeout_check = 0 end |
Class Attribute Details
.agent_count ⇒ Object (readonly)
Returns the value of attribute agent_count.
71 72 73 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 71 def agent_count @agent_count end |
.agents ⇒ Object (readonly)
Returns the value of attribute agents.
69 70 71 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 69 def agents @agents end |
.listeners ⇒ Object (readonly)
Returns the value of attribute listeners.
70 71 72 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 70 def listeners @listeners end |
.max_agent_id ⇒ Object (readonly)
Returns the value of attribute max_agent_id.
73 74 75 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 73 def max_agent_id @max_agent_id end |
.max_ships ⇒ Object (readonly)
Returns the value of attribute max_ships.
72 73 74 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 72 def max_ships @max_ships end |
Instance Attribute Details
#aborted ⇒ Object (readonly)
Returns the value of attribute aborted.
59 60 61 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 59 def aborted @aborted end |
#agent_id ⇒ Object (readonly)
Returns the value of attribute agent_id.
47 48 49 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 47 def agent_id @agent_id end |
#anchorable ⇒ Object (readonly)
Returns the value of attribute anchorable.
48 49 50 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 48 def anchorable @anchorable end |
#command_receiver ⇒ Object (readonly)
Returns the value of attribute command_receiver.
60 61 62 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 60 def command_receiver @command_receiver end |
#job_multiplexer ⇒ Object (readonly)
Returns the value of attribute job_multiplexer.
50 51 52 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 50 def job_multiplexer @job_multiplexer end |
#last_timeout_check ⇒ Object (readonly)
Returns the value of attribute last_timeout_check.
62 63 64 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 62 def last_timeout_check @last_timeout_check end |
#letter_queue ⇒ Object (readonly)
Returns the value of attribute letter_queue.
63 64 65 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 63 def letter_queue @letter_queue end |
#letter_queue_lock ⇒ Object (readonly)
Returns the value of attribute letter_queue_lock.
64 65 66 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 64 def letter_queue_lock @letter_queue_lock end |
#max_inbound_ships ⇒ Object (readonly)
Returns the value of attribute max_inbound_ships.
58 59 60 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 58 def max_inbound_ships @max_inbound_ships end |
#net_multiplexer ⇒ Object (readonly)
Returns the value of attribute net_multiplexer.
49 50 51 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 49 def net_multiplexer @net_multiplexer end |
#postpone_queue ⇒ Object (readonly)
Returns the value of attribute postpone_queue.
65 66 67 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 65 def postpone_queue @postpone_queue end |
#postpone_queue_lock ⇒ Object (readonly)
Returns the value of attribute postpone_queue_lock.
66 67 68 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 66 def postpone_queue_lock @postpone_queue_lock end |
#recipient ⇒ Object (readonly)
Returns the value of attribute recipient.
56 57 58 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 56 def recipient @recipient end |
#spider_multiplexer ⇒ Object (readonly)
Returns the value of attribute spider_multiplexer.
53 54 55 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 53 def spider_multiplexer @spider_multiplexer end |
#spin_multiplexer ⇒ Object (readonly)
Returns the value of attribute spin_multiplexer.
52 53 54 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 52 def spin_multiplexer @spin_multiplexer end |
#taxi_multiplexer ⇒ Object (readonly)
Returns the value of attribute taxi_multiplexer.
51 52 53 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 51 def taxi_multiplexer @taxi_multiplexer end |
#timer_handlers ⇒ Object (readonly)
Returns the value of attribute timer_handlers.
61 62 63 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 61 def timer_handlers @timer_handlers end |
Class Method Details
.add(agt_id, anchorable) ⇒ Object
630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 630 def self.add(agt_id, anchorable) if agt_id == -1 agt_id = @max_agent_id + 1 end BayLog.debug("Add agent: id=%d", agt_id) if agt_id > @max_agent_id @max_agent_id = agt_id end agt = GrandAgent.new(agt_id, @max_ships, anchorable) @agents[agt_id] = agt @listeners.each do |lis| lis.add(agt.agent_id) end return agt end |
.add_lifecycle_listener(lis) ⇒ Object
649 650 651 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 649 def GrandAgent.add_lifecycle_listener(lis) @listeners.append(lis) end |
.get(agt_id) ⇒ Object
626 627 628 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 626 def GrandAgent.get(agt_id) return @agents[agt_id] end |
.init(agt_ids, max_ships) ⇒ Object
class methods
615 616 617 618 619 620 621 622 623 624 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 615 def GrandAgent.init(agt_ids, max_ships) @agent_count = agt_ids.length @max_ships = max_ships if BayServer.harbor.multi_core agt_ids.each do | agt_id | add(agt_id, true) end end end |
Instance Method Details
#abort ⇒ Object
356 357 358 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 356 def abort BayLog.fatal("%s abort", self) end |
#abort_agent ⇒ Object
246 247 248 249 250 251 252 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 246 def abort_agent BayLog.info("%s abort", self) if BayServer.harbor.multi_core exit(1) end end |
#add_command_receiver(rd) ⇒ Object
282 283 284 285 286 287 288 289 290 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 282 def add_command_receiver(rd) @command_receiver = CommandReceiver.new() com_transporter = PlainTransporter.new(@net_multiplexer, @command_receiver, true, 8, false) @command_receiver.init(@agent_id, rd, com_transporter) st = RudderStateStore.get_store(@agent_id).rent st.init(@command_receiver.rudder, com_transporter) @net_multiplexer.add_rudder_state(@command_receiver.rudder, st) end |
#add_postpone(p) ⇒ Object
373 374 375 376 377 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 373 def add_postpone(p) @postpone_queue_lock.synchronize do @postpone_queue << p end end |
#add_timer_handler(handler) ⇒ Object
274 275 276 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 274 def add_timer_handler(handler) @timer_handlers << handler end |
#catch_up ⇒ Object
393 394 395 396 397 398 399 400 401 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 393 def catch_up BayLog.debug("%s catchUp", self) @postpone_queue_lock.synchronize do if not @postpone_queue.empty? r = @postpone_queue.shift r.run() end end end |
#inspect ⇒ Object
133 134 135 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 133 def inspect return to_s end |
#print_usage ⇒ Object
260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 260 def print_usage # print memory usage BayLog.info("%s MemUsage", self) BayLog.info(" Ruby version: %s", RUBY_VERSION) memsize = ObjectSpace.memsize_of_all # formatted by comma msize_comma = memsize.to_s.reverse.gsub(/(\d{3})(?=\d)/, '\\1,').reverse.then do |str| str[0] == ',' ? str[1..-1] : str end BayLog.info(" Total object size: %s bytes", msize_comma) MemUsage.get(@agent_id).print_usage(1) end |
#reload_cert ⇒ Object
361 362 363 364 365 366 367 368 369 370 371 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 361 def reload_cert BayServer.anchorable_port_map.values().each do |port| if port.secure() begin port.secure_docker.reload_cert() rescue => e BayLog.error_e(e) end end end end |
#remove_timer_handler(handler) ⇒ Object
278 279 280 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 278 def remove_timer_handler(handler) @timer_handlers.delete(handler) end |
#req_catch_up ⇒ Object
379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 379 def req_catch_up BayLog.debug("%s Req catchUp", self) if count_postpone > 0 catch_up else begin @command_receiver.send_command_to_monitor(self, CMD_CATCHUP, false) rescue IOError => e BayLog.error_e(e) abort end end end |
#req_shutdown ⇒ Object
254 255 256 257 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 254 def req_shutdown @aborted = true @recipient.wakeup end |
#run ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 146 def run BayLog.info(BayMessage.get(:MSG_RUNNING_GRAND_AGENT, self)) if @net_multiplexer.is_non_blocking @command_receiver.rudder.set_non_blocking end @net_multiplexer.req_read(@command_receiver.rudder) if @anchorable # Adds server socket channel of anchorable ports BayServer.anchorable_port_map.keys.each do |rd| if @net_multiplexer.is_non_blocking rd.set_non_blocking end st = RudderStateStore.get_store(@agent_id).rent st.init(rd) @net_multiplexer.add_rudder_state(rd, st) end end # Set up unanchorable channel =begin for ch in GrandAgent.unanchorable_port_map.keys() do port_dkr = GrandAgent.unanchorable_port_map[ch] tp = port_dkr.new_transporter(self, ch) @unanchorable_transporters[ch] = tp @non_blocking_handler.add_channel_listener(ch, tp) @non_blocking_handler.ask_to_start(ch) if !@anchorable @non_blocking_handler.ask_to_read(ch) end end =end @net_multiplexer.on_free begin while true if not @spin_multiplexer.is_empty # If "SpinHandler" is running, the select function does not block. received = @recipient.receive(false) @spin_multiplexer.process_data else received = @recipient.receive(true) end if @aborted BayLog.info("%s aborted by another thread", self) break end if @spin_multiplexer.is_empty && @letter_queue.empty? # timed out # check per 10 seconds if Time.now.tv_sec - @last_timeout_check >= 10 ring end end while !@letter_queue.empty? let = nil @letter_queue_lock.synchronize do let = @letter_queue.shift end st = let.multiplexer.get_rudder_state(let.rudder) if st == nil BayLog.debug("%s rudder is already returned: %s", self, let.rudder) next end case let when AcceptedLetter on_accepted(let, st) when ConnectedLetter on_connected(let, st) when ReadLetter on_read(let, st) when WroteLetter on_wrote(let, st) when ClosedLetter on_closed(let, st) when ErrorLetter on_error(let, st) end end end # while rescue Exception => e BayLog.fatal_e(e, "Uncaught Error: %s", e) ensure BayLog.info("%s end", self) shutdown end end |
#send_accepted_letter(state_id, rd, mpx, client_rd, wakeup) ⇒ Object
292 293 294 295 296 297 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 292 def send_accepted_letter(state_id, rd, mpx, client_rd, wakeup) if rd == nil raise ArgumentError.new end send_letter(AcceptedLetter.new(state_id, rd, mpx, client_rd), wakeup) end |
#send_closed_letter(state_id, rd, mpx, wakeup) ⇒ Object
319 320 321 322 323 324 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 319 def send_closed_letter(state_id, rd, mpx, wakeup) if rd == nil raise ArgumentError.new end send_letter(ClosedLetter.new(state_id, rd, mpx), wakeup) end |
#send_connected_letter(state_id, rd, mpx, wakeup) ⇒ Object
299 300 301 302 303 304 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 299 def send_connected_letter(state_id, rd, mpx, wakeup) if rd == nil raise ArgumentError.new end send_letter(ConnectedLetter.new(state_id, rd, mpx), wakeup) end |
#send_error_letter(state_id, rd, mpx, err, wakeup) ⇒ Object
326 327 328 329 330 331 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 326 def send_error_letter(state_id, rd, mpx, err, wakeup) if rd == nil raise ArgumentError.new end send_letter(ErrorLetter.new(state_id, rd, mpx, err), wakeup) end |
#send_read_letter(state_id, rd, mpx, n, adr, wakeup) ⇒ Object
305 306 307 308 309 310 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 305 def send_read_letter(state_id, rd, mpx, n, adr, wakeup) if rd == nil raise ArgumentError.new end send_letter(ReadLetter.new(state_id, rd, mpx, n, adr), wakeup) end |
#send_wrote_letter(state_id, rd, mpx, n, wakeup) ⇒ Object
312 313 314 315 316 317 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 312 def send_wrote_letter(state_id, rd, mpx, n, wakeup) if rd == nil raise ArgumentError.new end send_letter(WroteLetter.new(state_id, rd, mpx, n), wakeup) end |
#shutdown ⇒ Object
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 333 def shutdown BayLog.info("%s shutdown", self) if @aborted return end @aborted = true BayLog.debug("%s shutdown netMultiplexer", self) @net_multiplexer.shutdown() GrandAgent.listeners.each do |lis| lis.remove(@agent_id) end @command_receiver.end() GrandAgent.agents.delete(@agent_id) if BayServer.harbor.multi_core BayLog.debug("%s exit", self) exit(1) end @agent_id = -1 end |
#start ⇒ Object
Custom methods
140 141 142 143 144 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 140 def start Thread.new do run end end |
#to_s ⇒ Object
128 129 130 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 128 def to_s() return "agt#" + @agent_id.to_s end |