Class: Riser::SocketThreadDispatcher
- Inherits:
-
Object
- Object
- Riser::SocketThreadDispatcher
- Defined in:
- lib/riser/server.rb
Instance Attribute Summary collapse
-
#thread_num ⇒ Object
Returns the value of attribute thread_num.
-
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
-
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
Instance Method Summary collapse
-
#accept(&block) ⇒ Object
:yields:.
-
#accept_return(&block) ⇒ Object
:yields:.
-
#at_stat(&block) ⇒ Object
:yields: stat_info.
-
#at_stat_get(&block) ⇒ Object
:yields: reset.
-
#at_stat_stop(&block) ⇒ Object
:yields:.
-
#at_stop(&block) ⇒ Object
:yields: stop_state.
-
#dispatch(&block) ⇒ Object
:yields: socket.
-
#dispose(&block) ⇒ Object
:yields:.
-
#initialize(thread_queue_name) ⇒ SocketThreadDispatcher
constructor
A new instance of SocketThreadDispatcher.
-
#postprocess(&block) ⇒ Object
:yields:.
-
#preprocess(&block) ⇒ Object
:yields:.
-
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler.
-
#signal_stat_stop ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_forced ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler.
-
#start(_server_socket = nil) ⇒ Object
should be executed on the main thread sharing the stack with signal(2) handlers.
Constructor Details
#initialize(thread_queue_name) ⇒ SocketThreadDispatcher
Returns a new instance of SocketThreadDispatcher.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/riser/server.rb', line 207 def initialize(thread_queue_name) @thread_num = nil @thread_queue_name = thread_queue_name @thread_queue_size = nil @thread_queue_polling_timeout_seconds = nil @at_stop = nil @at_stat = nil @at_stat_get = nil @at_stat_stop = nil @preprocess = nil @postprocess = nil @accept = nil @accept_return = nil @dispatch = nil @dispose = nil @stop_state = nil @stat_operation_queue = [] end |
Instance Attribute Details
#thread_num ⇒ Object
Returns the value of attribute thread_num.
226 227 228 |
# File 'lib/riser/server.rb', line 226 def thread_num @thread_num end |
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
228 229 230 |
# File 'lib/riser/server.rb', line 228 def thread_queue_polling_timeout_seconds @thread_queue_polling_timeout_seconds end |
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
227 228 229 |
# File 'lib/riser/server.rb', line 227 def thread_queue_size @thread_queue_size end |
Instance Method Details
#accept(&block) ⇒ Object
:yields:
260 261 262 263 |
# File 'lib/riser/server.rb', line 260 def accept(&block) # :yields: @accept = block nil end |
#accept_return(&block) ⇒ Object
:yields:
265 266 267 268 |
# File 'lib/riser/server.rb', line 265 def accept_return(&block) # :yields: @accept_return = block nil end |
#at_stat(&block) ⇒ Object
:yields: stat_info
235 236 237 238 |
# File 'lib/riser/server.rb', line 235 def at_stat(&block) # :yields: stat_info @at_stat = block nil end |
#at_stat_get(&block) ⇒ Object
:yields: reset
240 241 242 243 |
# File 'lib/riser/server.rb', line 240 def at_stat_get(&block) # :yields: reset @at_stat_get = block nil end |
#at_stat_stop(&block) ⇒ Object
:yields:
245 246 247 248 |
# File 'lib/riser/server.rb', line 245 def at_stat_stop(&block) # :yields: @at_stat_stop = block nil end |
#at_stop(&block) ⇒ Object
:yields: stop_state
230 231 232 233 |
# File 'lib/riser/server.rb', line 230 def at_stop(&block) # :yields: stop_state @at_stop = block nil end |
#dispatch(&block) ⇒ Object
:yields: socket
270 271 272 273 |
# File 'lib/riser/server.rb', line 270 def dispatch(&block) # :yields: socket @dispatch = block nil end |
#dispose(&block) ⇒ Object
:yields:
275 276 277 278 |
# File 'lib/riser/server.rb', line 275 def dispose(&block) # :yields: @dispose = block nil end |
#postprocess(&block) ⇒ Object
:yields:
255 256 257 258 |
# File 'lib/riser/server.rb', line 255 def postprocess(&block) # :yields: @postprocess = block nil end |
#preprocess(&block) ⇒ Object
:yields:
250 251 252 253 |
# File 'lib/riser/server.rb', line 250 def preprocess(&block) # :yields: @preprocess = block nil end |
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler
295 296 297 298 299 300 301 302 303 |
# File 'lib/riser/server.rb', line 295 def signal_stat_get(reset: true) if (reset) then @stat_operation_queue << :get_and_reset else @stat_operation_queue << :get end nil end |
#signal_stat_stop ⇒ Object
should be called from signal(2) handler
306 307 308 309 |
# File 'lib/riser/server.rb', line 306 def signal_stat_stop @stat_operation_queue << :stop nil end |
#signal_stop_forced ⇒ Object
should be called from signal(2) handler
287 288 289 290 291 292 |
# File 'lib/riser/server.rb', line 287 def signal_stop_forced if (! @stop_state || @stop_state == :graceful) then @stop_state = :forced end nil end |
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler
281 282 283 284 |
# File 'lib/riser/server.rb', line 281 def signal_stop_graceful @stop_state ||= :graceful nil end |
#start(_server_socket = nil) ⇒ Object
should be executed on the main thread sharing the stack with signal(2) handlers
_server_socket is a dummy argument to call like SocketProcessDispatcher#start.
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 |
# File 'lib/riser/server.rb', line 337 def start(_server_socket=nil) error_lock = Mutex.new last_error = nil @preprocess.call begin queue = TimeoutSizedQueue.new(@thread_queue_size, name: @thread_queue_name) begin thread_list = [] @thread_num.times do |i| thread_list << Thread.start(i) {|thread_number| begin Thread.current[:number] = thread_number begin while (socket = queue.pop) begin @dispatch.call(socket) ensure socket.close unless socket.closed? end end ensure @dispose.call end rescue error_lock.synchronize{ last_error = $! } end } end catch (:end_of_server) { while (true) begin error_lock.synchronize{ last_error } and @stop_state = :forced @stop_state and throw(:end_of_server) apply_signal_stat(queue) socket = @accept.call end until (socket) until (queue.push(socket, @thread_queue_polling_timeout_seconds)) error_lock.synchronize{ last_error } and @stop_state = :forced if (@stop_state == :forced) then socket.close @accept_return.call throw(:end_of_server) end apply_signal_stat(queue) end @accept_return.call end } ensure queue.close end begin done = catch(:retry_stopping) { @at_stop.call(@stop_state) case (@stop_state) when :graceful until (thread_list.empty?) until (thread_list[0].join(@thread_queue_polling_timeout_seconds)) if (@stop_state == :forced) then throw(:retry_stopping) end end thread_list.shift end when :forced until (thread_list.empty?) thread_list[0].kill thread_list.shift end else raise "internal error: unknown stop state <#{@stop_state.inspect}>" end true } end until (done) ensure @postprocess.call end error_lock.synchronize{ if (last_error) then raise last_error end } nil end |