Class: Baykit::BayServer::Agent::NonBlockingHandler
- Inherits:
-
Object
- Object
- Baykit::BayServer::Agent::NonBlockingHandler
- Includes:
- Baykit::BayServer::Agent, Util
- Defined in:
- lib/baykit/bayserver/agent/non_blocking_handler.rb
Overview
Channel handler
Sockets or file descriptors are kinds of channel
Defined Under Namespace
Classes: ChannelOperation, ChannelState
Instance Attribute Summary collapse
-
#agent ⇒ Object
readonly
Returns the value of attribute agent.
-
#ch_count ⇒ Object
readonly
Returns the value of attribute ch_count.
-
#ch_map ⇒ Object
readonly
Returns the value of attribute ch_map.
-
#operations ⇒ Object
readonly
Returns the value of attribute operations.
-
#operations_lock ⇒ Object
readonly
Returns the value of attribute operations_lock.
Instance Method Summary collapse
- #add_channel_listener(ch, lis) ⇒ Object
- #ask_to_close(ch) ⇒ Object
- #ask_to_connect(ch, addr) ⇒ Object
- #ask_to_read(ch) ⇒ Object
- #ask_to_start(ch) ⇒ Object
- #ask_to_write(ch) ⇒ Object
- #close_all ⇒ Object
- #close_timeout_sockets ⇒ Object
- #handle_channel(ch, op) ⇒ Object
-
#initialize(ship_agent) ⇒ NonBlockingHandler
constructor
A new instance of NonBlockingHandler.
- #register_channel_ops ⇒ Object
- #to_s ⇒ Object
Constructor Details
#initialize(ship_agent) ⇒ NonBlockingHandler
Returns a new instance of NonBlockingHandler.
75 76 77 78 79 80 81 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 75 def initialize(ship_agent) @agent = ship_agent @ch_map = {} @ch_count = 0 @operations = [] @operations_lock = Monitor.new() end |
Instance Attribute Details
#agent ⇒ Object (readonly)
Returns the value of attribute agent.
69 70 71 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 69 def agent @agent end |
#ch_count ⇒ Object (readonly)
Returns the value of attribute ch_count.
71 72 73 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 71 def ch_count @ch_count end |
#ch_map ⇒ Object (readonly)
Returns the value of attribute ch_map.
70 71 72 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 70 def ch_map @ch_map end |
#operations ⇒ Object (readonly)
Returns the value of attribute operations.
72 73 74 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 72 def operations @operations end |
#operations_lock ⇒ Object (readonly)
Returns the value of attribute operations_lock.
73 74 75 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 73 def operations_lock @operations_lock end |
Instance Method Details
#add_channel_listener(ch, lis) ⇒ Object
269 270 271 272 273 274 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 269 def add_channel_listener(ch, lis) ch_state = ChannelState.new(ch, lis) add_channel_state(ch, ch_state) ch_state.access() return ch_state end |
#ask_to_close(ch) ⇒ Object
332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 332 def ask_to_close(ch) ch_state = find_channel_state(ch) BayLog.debug("%s askToClose chState=%s", @agent, ch_state); if ch_state == nil BayLog.warn("%s channel state not found: %s", @agent, ch) return end ch_state.closing = true add_operation(ch, Selector::OP_WRITE, false, true) ch_state.access end |
#ask_to_connect(ch, addr) ⇒ Object
284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 284 def ask_to_connect(ch, addr) ch_state = find_channel_state(ch) BayLog.debug("%s askToConnect addr=%s skt=%s chState=%s", @agent, addr, ch, ch_state) begin ch.connect_nonblock(addr) rescue IO::WaitWritable => e #BayLog.error_e(e) end ch_state.connecting = true add_operation(ch, Selector::OP_READ, true) end |
#ask_to_read(ch) ⇒ Object
298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 298 def ask_to_read(ch) ch_state = find_channel_state(ch) BayLog.debug("%s askToRead chState=%s", @agent, ch_state); if ch.closed? raise IOError.new("Channel is closed") end add_operation(ch, Selector::OP_READ) if ch_state != nil ch_state.access() end end |
#ask_to_start(ch) ⇒ Object
276 277 278 279 280 281 282 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 276 def ask_to_start(ch) BayLog.debug("%s askToStart: ch=%s", @agent, ch) ch_state = find_channel_state(ch) ch_state.accepted = true end |
#ask_to_write(ch) ⇒ Object
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 313 def ask_to_write(ch) ch_state = find_channel_state(ch) BayLog.debug("%s askToWrite chState=%s", @agent, ch_state); if ch.closed? BayLog.warn("%s Channel is closed: %s", @agent, ch) return end add_operation(ch, Selector::OP_WRITE) if ch_state == nil BayLog.error("Unknown socket (or closed)") return end ch_state.access() end |
#close_all ⇒ Object
347 348 349 350 351 352 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 347 def close_all() @ch_map.keys().each do |ch| st = find_channel_state(ch) close_channel(ch, st) end end |
#close_timeout_sockets ⇒ Object
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 247 def close_timeout_sockets() if @ch_map.empty? return end close_list = [] now = DateTime.now @ch_map.values.each do |ch_state| if ch_state.listener != nil duration = ((now - ch_state.last_access_time) * 86400).to_i if ch_state.listener.check_timeout(ch_state.channel, duration) BayLog.debug("%s timeout: skt=%s", @agent, ch_state.channel) close_list << ch_state end end end close_list.each do |ch_state| close_channel ch_state.channel, ch_state end end |
#handle_channel(ch, op) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 88 def handle_channel(ch, op) ch_state = find_channel_state(ch) if ch_state == nil BayLog.error("Cannot find fd state (Maybe file is closed)") @agent.selector.unregister(ch) return end next_action = nil begin if ch_state.closing next_action = NextSocketAction::CLOSE elsif ch_state.connecting ch_state.connecting = false # connectable next_action = ch_state.listener.on_connectable(ch) if next_action == nil raise Sink.new("unknown next action") elsif next_action == NextSocketAction::CONTINUE ask_to_read(ch) end else if op & Selector::OP_READ != 0 # readable next_action = ch_state.listener.on_readable(ch) if next_action == nil raise Sink.new("unknown next action") elsif next_action == NextSocketAction::WRITE op = @agent.selector.get_op(ch) op = op | Selector::OP_WRITE @agent.selector.modify(ch, op) end end if (next_action != NextSocketAction::CLOSE) && (op & Selector::OP_WRITE != 0) # writable next_action = ch_state.listener.on_writable(ch) if next_action == nil raise Sink.new("unknown next action") elsif next_action == NextSocketAction::READ # Handle as "Write Off" op = @agent.selector.get_op(ch) op = op & ~Selector::OP_WRITE if op == 0 @agent.selector.unregister(ch) else @agent.selector.modify(ch, op) end end end end if next_action == nil raise Sink.new("unknown next action") end rescue Sink => e raise e rescue => e if e.kind_of? EOFError BayLog.debug("%s Socket closed by peer: skt=%s", @agent, ch.inspect) elsif e.kind_of? SystemCallError BayLog.debug("%s O/S error: %s (skt=%s)", @agent, e., ch.inspect) elsif e.kind_of? IOError BayLog.debug("%s IO error: %s (skt=%s)", @agent, e., ch.inspect) elsif e.kind_of? OpenSSL::SSL::SSLError BayLog.debug("%s SSL error: %s (skt=%s)", @agent, e., ch.inspect) else BayLog.error("%s Unhandled error error: %s (skt=%s)", @agent, e, ch.inspect) throw e end # Cannot handle Exception any more ch_state.listener.on_error(ch, e) next_action = NextSocketAction::CLOSE end cancel = false ch_state.access() BayLog.trace("%s next=%d", ch_state, next_action) case next_action when NextSocketAction::CLOSE close_channel(ch, ch_state) cancel = false # already canceled in close_channel method when NextSocketAction::SUSPEND cancel = true when NextSocketAction::CONTINUE, NextSocketAction::READ, NextSocketAction::WRITE # do nothing else raise RuntimeError.new("IllegalState:: #{next_action}") end if cancel BayLog.trace("%s cancel key chState=%s", @agent, ch_state) @agent.selector.unregister(ch) end end |
#register_channel_ops ⇒ Object
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 194 def register_channel_ops() if @operations.empty? return 0 end @operations_lock.synchronize do nch = @operations.length @operations.each do |ch_op| st = self.find_channel_state(ch_op.ch) if ch_op.ch.closed? # Channel is closed before register operation BayLog.debug("%s Try to register closed socket (Ignore)", @agent) next end begin BayLog.trace("%s register op=%s chState=%s", @agent, self.class.op_mode(ch_op.op), st) op = @agent.selector.get_op(ch_op.ch) if op == nil @agent.selector.register(ch_op.ch, ch_op.op) else new_op = op | ch_op.op BayLog.debug("%s Already registered ch=%s op=%s update to %s", @agent, ch_op.ch, self.class.op_mode(op), self.class.op_mode(new_op)) @agent.selector.modify(ch_op.ch, new_op) end if ch_op.to_connect if st == nil BayLog.warn("%s register connect but ChannelState is null", @agent); else st.connecting = true end elsif ch_op.to_close if st == nil BayLog.warn("%s chState=%s register close but ChannelState", self.agent); else st.closing = true end end rescue => e cst = find_channel_state(ch_op.ch) BayLog.error_e(e, "%s Cannot register operation: %s", self.agent, cst != nil ? cst.listener : nil) end end @operations.clear() return nch end end |
#to_s ⇒ Object
84 85 86 |
# File 'lib/baykit/bayserver/agent/non_blocking_handler.rb', line 84 def to_s() return @agent.to_s() end |