Class: Baykit::BayServer::Agent::GrandAgent
- Inherits:
-
Object
- Object
- Baykit::BayServer::Agent::GrandAgent
- Includes:
- Baykit::BayServer, Signal, Taxi, Train, Util
- Defined in:
- lib/baykit/bayserver/agent/grand_agent.rb
Defined Under Namespace
Modules: GrandAgentLifecycleListener
Constant Summary collapse
- SELECT_TIMEOUT_SEC =
10- CMD_OK =
0- CMD_CLOSE =
1- CMD_RELOAD_CERT =
2- CMD_MEM_USAGE =
3- CMD_SHUTDOWN =
4- CMD_ABORT =
5
Class Attribute Summary collapse
-
.agent_count ⇒ Object
readonly
Returns the value of attribute agent_count.
-
.agents ⇒ Object
readonly
Returns the value of attribute agents.
-
.anchorable_port_map ⇒ Object
readonly
Returns the value of attribute anchorable_port_map.
-
.listeners ⇒ Object
readonly
Returns the value of attribute listeners.
-
.max_agent_id ⇒ Object
readonly
Returns the value of attribute max_agent_id.
-
.max_ships ⇒ Object
readonly
Returns the value of attribute max_ships.
-
.multi_core ⇒ Object
readonly
Returns the value of attribute multi_core.
-
.unanchorable_port_map ⇒ Object
readonly
Returns the value of attribute unanchorable_port_map.
Instance Attribute Summary collapse
-
#aborted ⇒ Object
readonly
Returns the value of attribute aborted.
-
#accept_handler ⇒ Object
readonly
Returns the value of attribute accept_handler.
-
#agent_id ⇒ Object
readonly
Returns the value of attribute agent_id.
-
#anchorable ⇒ Object
readonly
Returns the value of attribute anchorable.
-
#command_receiver ⇒ Object
readonly
Returns the value of attribute command_receiver.
-
#max_inbound_ships ⇒ Object
readonly
Returns the value of attribute max_inbound_ships.
-
#non_blocking_handler ⇒ Object
readonly
Returns the value of attribute non_blocking_handler.
-
#select_timeout_sec ⇒ Object
readonly
Returns the value of attribute select_timeout_sec.
-
#select_wakeup_pipe ⇒ Object
readonly
Returns the value of attribute select_wakeup_pipe.
-
#selector ⇒ Object
readonly
Returns the value of attribute selector.
-
#send_wakeup_pipe ⇒ Object
readonly
Returns the value of attribute send_wakeup_pipe.
-
#spin_handler ⇒ Object
readonly
Returns the value of attribute spin_handler.
-
#unanchorable_transporters ⇒ Object
readonly
Returns the value of attribute unanchorable_transporters.
Instance Method Summary collapse
- #abort_agent(err = nil, status = 1) ⇒ Object
-
#initialize(agent_id, max_ships, anchorable) ⇒ GrandAgent
constructor
A new instance of GrandAgent.
- #inspect ⇒ Object
- #print_usage ⇒ Object
- #reload_cert ⇒ Object
- #run ⇒ Object
- #run_command_receiver(com_channel) ⇒ Object
- #shutdown ⇒ Object
- #to_s ⇒ Object
- #wakeup ⇒ Object
Constructor Details
#initialize(agent_id, max_ships, anchorable) ⇒ GrandAgent
Returns a new instance of GrandAgent.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 82 def initialize (agent_id, max_ships, anchorable) @agent_id = agent_id @anchorable = anchorable if @anchorable @accept_handler = AcceptHandler.new(self, GrandAgent.anchorable_port_map) else @accept_handler = nil end @spin_handler = SpinHandler.new(self) @non_blocking_handler = NonBlockingHandler.new(self) @select_timeout_sec = SELECT_TIMEOUT_SEC @max_inbound_ships = max_ships @selector = Selector.new() @aborted = false @unanchorable_transporters = {} end |
Class Attribute Details
.agent_count ⇒ Object (readonly)
Returns the value of attribute agent_count.
61 62 63 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 61 def agent_count @agent_count end |
.agents ⇒ Object (readonly)
Returns the value of attribute agents.
59 60 61 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 59 def agents @agents end |
.anchorable_port_map ⇒ Object (readonly)
Returns the value of attribute anchorable_port_map.
62 63 64 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 62 def anchorable_port_map @anchorable_port_map end |
.listeners ⇒ Object (readonly)
Returns the value of attribute listeners.
60 61 62 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 60 def listeners @listeners end |
.max_agent_id ⇒ Object (readonly)
Returns the value of attribute max_agent_id.
65 66 67 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 65 def max_agent_id @max_agent_id end |
.max_ships ⇒ Object (readonly)
Returns the value of attribute max_ships.
64 65 66 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 64 def max_ships @max_ships end |
.multi_core ⇒ Object (readonly)
Returns the value of attribute multi_core.
66 67 68 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 66 def multi_core @multi_core end |
.unanchorable_port_map ⇒ Object (readonly)
Returns the value of attribute unanchorable_port_map.
63 64 65 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 63 def unanchorable_port_map @unanchorable_port_map end |
Instance Attribute Details
#aborted ⇒ Object (readonly)
Returns the value of attribute aborted.
55 56 57 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 55 def aborted @aborted end |
#accept_handler ⇒ Object (readonly)
Returns the value of attribute accept_handler.
48 49 50 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 48 def accept_handler @accept_handler end |
#agent_id ⇒ Object (readonly)
Returns the value of attribute agent_id.
44 45 46 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 44 def agent_id @agent_id end |
#anchorable ⇒ Object (readonly)
Returns the value of attribute anchorable.
45 46 47 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 45 def anchorable @anchorable end |
#command_receiver ⇒ Object (readonly)
Returns the value of attribute command_receiver.
56 57 58 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 56 def command_receiver @command_receiver end |
#max_inbound_ships ⇒ Object (readonly)
Returns the value of attribute max_inbound_ships.
52 53 54 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 52 def max_inbound_ships @max_inbound_ships end |
#non_blocking_handler ⇒ Object (readonly)
Returns the value of attribute non_blocking_handler.
46 47 48 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 46 def non_blocking_handler @non_blocking_handler end |
#select_timeout_sec ⇒ Object (readonly)
Returns the value of attribute select_timeout_sec.
51 52 53 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 51 def select_timeout_sec @select_timeout_sec end |
#select_wakeup_pipe ⇒ Object (readonly)
Returns the value of attribute select_wakeup_pipe.
50 51 52 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 50 def select_wakeup_pipe @select_wakeup_pipe end |
#selector ⇒ Object (readonly)
Returns the value of attribute selector.
53 54 55 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 53 def selector @selector end |
#send_wakeup_pipe ⇒ Object (readonly)
Returns the value of attribute send_wakeup_pipe.
49 50 51 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 49 def send_wakeup_pipe @send_wakeup_pipe end |
#spin_handler ⇒ Object (readonly)
Returns the value of attribute spin_handler.
47 48 49 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 47 def spin_handler @spin_handler end |
#unanchorable_transporters ⇒ Object (readonly)
Returns the value of attribute unanchorable_transporters.
54 55 56 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 54 def unanchorable_transporters @unanchorable_transporters end |
Instance Method Details
#abort_agent(err = nil, status = 1) ⇒ Object
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 215 def abort_agent(err = nil, status = 1) if err BayLog.fatal("%s abort", self) BayLog.fatal_e(err) end @command_receiver.end() GrandAgent.listeners.each do |lis| lis.remove(self) end GrandAgent.agents.delete(@agent_id) if BayServer.harbor.multi_core exit(1) else clean() end @aborted = true end |
#inspect ⇒ Object
109 110 111 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 109 def inspect() return to_s end |
#print_usage ⇒ Object
249 250 251 252 253 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 249 def print_usage() # print memory usage BayLog.info("Agent#%d MemUsage", @agent_id); MemUsage.get(@agent_id).print_usage(1); end |
#reload_cert ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 237 def reload_cert() GrandAgent.anchorable_port_map.values().each do |port| if port.secure() begin port.secure_docker.reload_cert() rescue => e BayLog.error_e(e) end end end end |
#run ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 113 def run BayLog.info(BayMessage.get(:MSG_RUNNING_GRAND_AGENT, self)) @select_wakeup_pipe = IO.pipe @selector.register(@select_wakeup_pipe[0], Selector::OP_READ) @selector.register(@command_receiver.communication_channel, Selector::OP_READ) # Set up unanchorable channel for ch in GrandAgent.unanchorable_port_map.keys() do port_dkr = GrandAgent.unanchorable_port_map[ch] tp = port_dkr.new_transporter(self, ch) @unanchorable_transporters[ch] = tp @non_blocking_handler.add_channel_listener(ch, tp) @non_blocking_handler.ask_to_start(ch) if !@anchorable @non_blocking_handler.ask_to_read(ch) end end busy = true begin while not @aborted begin count = -1 if @accept_handler test_busy = @accept_handler.ch_count >= @max_inbound_ships if test_busy != busy busy = test_busy if busy @accept_handler.on_busy() else @accept_handler.on_free() end end end if !busy && @selector.count() == 2 # agent finished BayLog.debug("%s Selector has no key", self) break end if !@spin_handler.empty? timeout = 0 else timeout = @select_timeout_sec end #@BayServer.debug("Selecting... read=" + read_list.to_s) selected_map = @selector.select(timeout) #BayLog.debug("%s selected: %s", self, selected_map) processed = @non_blocking_handler.register_channel_ops() > 0 if selected_map.length == 0 # No channel is selected processed |= @spin_handler.process_data() end selected_map.keys().each do |ch| if ch == @select_wakeup_pipe[0] # Waked up by ask_to_* on_waked_up(ch) elsif ch == @command_receiver.communication_channel @command_receiver.on_pipe_readable() elsif @accept_handler && @accept_handler.server_socket?(ch) @accept_handler.on_acceptable(ch) else @non_blocking_handler.handle_channel(ch, selected_map[ch]) end processed = true end if not processed # timeout check @non_blocking_handler.close_timeout_sockets() @spin_handler.stop_timeout_spins() end rescue => e raise e end end # while rescue => e BayLog.error_e(e) raise e ensure BayLog.info("%s end", self) abort_agent(nil, 0) end end |
#run_command_receiver(com_channel) ⇒ Object
260 261 262 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 260 def run_command_receiver(com_channel) @command_receiver = CommandReceiver.new(self, com_channel) end |
#shutdown ⇒ Object
206 207 208 209 210 211 212 213 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 206 def shutdown() BayLog.debug("%s shutdown", self) if @accept_handler != nil @accept_handler.shutdown() end @aborted = true wakeup() end |
#to_s ⇒ Object
104 105 106 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 104 def to_s() return "Agt#" + @agent_id.to_s end |
#wakeup ⇒ Object
255 256 257 258 |
# File 'lib/baykit/bayserver/agent/grand_agent.rb', line 255 def wakeup #BayLog.debug("%s wakeup", self) IOUtil.write_int32(@select_wakeup_pipe[1], 0) end |