Class: Riser::SocketThreadDispatcher
- Inherits:
-
Object
- Object
- Riser::SocketThreadDispatcher
- Defined in:
- lib/riser/server.rb
Instance Attribute Summary collapse
-
#thread_num ⇒ Object
Returns the value of attribute thread_num.
-
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
-
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
Instance Method Summary collapse
-
#accept(&block) ⇒ Object
:yields:.
-
#accept_return(&block) ⇒ Object
:yields:.
-
#at_stat(&block) ⇒ Object
:yields: stat_info.
-
#at_stat_get(&block) ⇒ Object
:yields: reset.
-
#at_stat_stop(&block) ⇒ Object
:yields:.
-
#at_stop(&block) ⇒ Object
:yields: stop_state.
-
#dispatch(&block) ⇒ Object
:yields: socket.
-
#initialize(thread_queue_name) ⇒ SocketThreadDispatcher
constructor
A new instance of SocketThreadDispatcher.
-
#postprocess(&block) ⇒ Object
:yields:.
-
#preprocess(&block) ⇒ Object
:yields:.
-
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler.
-
#signal_stat_stop ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_forced ⇒ Object
should be called from signal(2) handler.
-
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler.
-
#start(_server_socket = nil) ⇒ Object
should be executed on the main thread sharing the stack with signal(2) handlers.
Constructor Details
#initialize(thread_queue_name) ⇒ SocketThreadDispatcher
Returns a new instance of SocketThreadDispatcher.
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/riser/server.rb', line 177 def initialize(thread_queue_name) @thread_num = nil @thread_queue_name = thread_queue_name @thread_queue_size = nil @thread_queue_polling_timeout_seconds = nil @at_stop = nil @at_stat = nil @at_stat_get = nil @at_stat_stop = nil @preprocess = nil @postprocess = nil @accept = nil @accept_return = nil @dispatch = nil @stop_state = nil @stat_operation_queue = [] end |
Instance Attribute Details
#thread_num ⇒ Object
Returns the value of attribute thread_num.
195 196 197 |
# File 'lib/riser/server.rb', line 195 def thread_num @thread_num end |
#thread_queue_polling_timeout_seconds ⇒ Object
Returns the value of attribute thread_queue_polling_timeout_seconds.
197 198 199 |
# File 'lib/riser/server.rb', line 197 def thread_queue_polling_timeout_seconds @thread_queue_polling_timeout_seconds end |
#thread_queue_size ⇒ Object
Returns the value of attribute thread_queue_size.
196 197 198 |
# File 'lib/riser/server.rb', line 196 def thread_queue_size @thread_queue_size end |
Instance Method Details
#accept(&block) ⇒ Object
:yields:
229 230 231 232 |
# File 'lib/riser/server.rb', line 229 def accept(&block) # :yields: @accept = block nil end |
#accept_return(&block) ⇒ Object
:yields:
234 235 236 237 |
# File 'lib/riser/server.rb', line 234 def accept_return(&block) # :yields: @accept_return = block nil end |
#at_stat(&block) ⇒ Object
:yields: stat_info
204 205 206 207 |
# File 'lib/riser/server.rb', line 204 def at_stat(&block) # :yields: stat_info @at_stat = block nil end |
#at_stat_get(&block) ⇒ Object
:yields: reset
209 210 211 212 |
# File 'lib/riser/server.rb', line 209 def at_stat_get(&block) # :yields: reset @at_stat_get = block nil end |
#at_stat_stop(&block) ⇒ Object
:yields:
214 215 216 217 |
# File 'lib/riser/server.rb', line 214 def at_stat_stop(&block) # :yields: @at_stat_stop = block nil end |
#at_stop(&block) ⇒ Object
:yields: stop_state
199 200 201 202 |
# File 'lib/riser/server.rb', line 199 def at_stop(&block) # :yields: stop_state @at_stop = block nil end |
#dispatch(&block) ⇒ Object
:yields: socket
239 240 241 242 |
# File 'lib/riser/server.rb', line 239 def dispatch(&block) # :yields: socket @dispatch = block nil end |
#postprocess(&block) ⇒ Object
:yields:
224 225 226 227 |
# File 'lib/riser/server.rb', line 224 def postprocess(&block) # :yields: @postprocess = block nil end |
#preprocess(&block) ⇒ Object
:yields:
219 220 221 222 |
# File 'lib/riser/server.rb', line 219 def preprocess(&block) # :yields: @preprocess = block nil end |
#signal_stat_get(reset: true) ⇒ Object
should be called from signal(2) handler
257 258 259 260 261 262 263 264 265 |
# File 'lib/riser/server.rb', line 257 def signal_stat_get(reset: true) if (reset) then @stat_operation_queue << :get_and_reset else @stat_operation_queue << :get end nil end |
#signal_stat_stop ⇒ Object
should be called from signal(2) handler
268 269 270 271 |
# File 'lib/riser/server.rb', line 268 def signal_stat_stop @stat_operation_queue << :stop nil end |
#signal_stop_forced ⇒ Object
should be called from signal(2) handler
251 252 253 254 |
# File 'lib/riser/server.rb', line 251 def signal_stop_forced @stop_state ||= :forced nil end |
#signal_stop_graceful ⇒ Object
should be called from signal(2) handler
245 246 247 248 |
# File 'lib/riser/server.rb', line 245 def signal_stop_graceful @stop_state ||= :graceful nil end |
#start(_server_socket = nil) ⇒ Object
should be executed on the main thread sharing the stack with signal(2) handlers
_server_socket is a dummy argument to call like SocketProcessDispatcher#start.
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/riser/server.rb', line 301 def start(_server_socket=nil) error_lock = Mutex.new last_error = nil @preprocess.call begin queue = TimeoutSizedQueue.new(@thread_queue_size, name: @thread_queue_name) begin thread_list = [] @thread_num.times{|i| thread_list << Thread.new{ begin Thread.current[:number] = i while (socket = queue.pop) begin @dispatch.call(socket) ensure socket.close unless socket.closed? end end rescue error_lock.synchronize{ last_error = $! } end } } catch (:end_of_server) { while (true) begin error_lock.synchronize{ last_error } and @stop_state = :forced @stop_state and throw(:end_of_server) apply_signal_stat(queue) socket = @accept.call end until (socket) until (queue.push(socket, @thread_queue_polling_timeout_seconds)) error_lock.synchronize{ last_error } and @stop_state = :forced if (@stop_state == :forced) then socket.close @accept_return.call throw(:end_of_server) end apply_signal_stat(queue) end @accept_return.call end } ensure queue.close end @at_stop.call(@stop_state) case (@stop_state) when :graceful for thread in thread_list thread.join end when :forced for thread in thread_list thread.kill end else raise "internal error: unknown stop state <#{@stop_state.inspect}>" end ensure @postprocess.call end error_lock.synchronize{ if (last_error) then raise last_error end } nil end |