Class: Riser::SocketThreadDispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/riser/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(thread_queue_name) ⇒ SocketThreadDispatcher

Returns a new instance of SocketThreadDispatcher.



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/riser/server.rb', line 177

def initialize(thread_queue_name)
  @thread_num = nil
  @thread_queue_name = thread_queue_name
  @thread_queue_size = nil
  @thread_queue_polling_timeout_seconds = nil
  @at_stop = nil
  @at_stat = nil
  @at_stat_get = nil
  @at_stat_stop = nil
  @preprocess = nil
  @postprocess = nil
  @accept = nil
  @accept_return = nil
  @dispatch = nil
  @stop_state = nil
  @stat_operation_queue = []
end

Instance Attribute Details

#thread_numObject

Returns the value of attribute thread_num.



195
196
197
# File 'lib/riser/server.rb', line 195

def thread_num
  @thread_num
end

#thread_queue_polling_timeout_secondsObject

Returns the value of attribute thread_queue_polling_timeout_seconds.



197
198
199
# File 'lib/riser/server.rb', line 197

def thread_queue_polling_timeout_seconds
  @thread_queue_polling_timeout_seconds
end

#thread_queue_sizeObject

Returns the value of attribute thread_queue_size.



196
197
198
# File 'lib/riser/server.rb', line 196

def thread_queue_size
  @thread_queue_size
end

Instance Method Details

#accept(&block) ⇒ Object

:yields:



229
230
231
232
# File 'lib/riser/server.rb', line 229

def accept(&block)          # :yields:
  @accept = block
  nil
end

#accept_return(&block) ⇒ Object

:yields:



234
235
236
237
# File 'lib/riser/server.rb', line 234

def accept_return(&block)   # :yields:
  @accept_return = block
  nil
end

#at_stat(&block) ⇒ Object

:yields: stat_info



204
205
206
207
# File 'lib/riser/server.rb', line 204

def at_stat(&block)         # :yields: stat_info
  @at_stat = block
  nil
end

#at_stat_get(&block) ⇒ Object

:yields: reset



209
210
211
212
# File 'lib/riser/server.rb', line 209

def at_stat_get(&block)     # :yields: reset
  @at_stat_get = block
  nil
end

#at_stat_stop(&block) ⇒ Object

:yields:



214
215
216
217
# File 'lib/riser/server.rb', line 214

def at_stat_stop(&block)    # :yields:
  @at_stat_stop = block
  nil
end

#at_stop(&block) ⇒ Object

:yields: stop_state



199
200
201
202
# File 'lib/riser/server.rb', line 199

def at_stop(&block)         # :yields: stop_state
  @at_stop = block
  nil
end

#dispatch(&block) ⇒ Object

:yields: socket



239
240
241
242
# File 'lib/riser/server.rb', line 239

def dispatch(&block)        # :yields: socket
  @dispatch = block
  nil
end

#postprocess(&block) ⇒ Object

:yields:



224
225
226
227
# File 'lib/riser/server.rb', line 224

def postprocess(&block)     # :yields:
  @postprocess = block
  nil
end

#preprocess(&block) ⇒ Object

:yields:



219
220
221
222
# File 'lib/riser/server.rb', line 219

def preprocess(&block)      # :yields:
  @preprocess = block
  nil
end

#signal_stat_get(reset: true) ⇒ Object

should be called from signal(2) handler



257
258
259
260
261
262
263
264
265
# File 'lib/riser/server.rb', line 257

def signal_stat_get(reset: true)
  if (reset) then
    @stat_operation_queue << :get_and_reset
  else
    @stat_operation_queue << :get
  end

  nil
end

#signal_stat_stopObject

should be called from signal(2) handler



268
269
270
271
# File 'lib/riser/server.rb', line 268

def signal_stat_stop
  @stat_operation_queue << :stop
  nil
end

#signal_stop_forcedObject

should be called from signal(2) handler



251
252
253
254
# File 'lib/riser/server.rb', line 251

def signal_stop_forced
  @stop_state ||= :forced
  nil
end

#signal_stop_gracefulObject

should be called from signal(2) handler



245
246
247
248
# File 'lib/riser/server.rb', line 245

def signal_stop_graceful
  @stop_state ||= :graceful
  nil
end

#start(_server_socket = nil) ⇒ Object

should be executed on the main thread sharing the stack with signal(2) handlers

_server_socket is a dummy argument to call like SocketProcessDispatcher#start.



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/riser/server.rb', line 301

def start(_server_socket=nil)
  error_lock = Mutex.new
  last_error = nil

  @preprocess.call
  begin
    queue = TimeoutSizedQueue.new(@thread_queue_size, name: @thread_queue_name)
    begin
      thread_list = []
      @thread_num.times{|i|
        thread_list << Thread.new{
          begin
            Thread.current[:number] = i
            while (socket = queue.pop)
              begin
                @dispatch.call(socket)
              ensure
                socket.close unless socket.closed?
              end
            end
          rescue
            error_lock.synchronize{
              last_error = $!
            }
          end
        }
      }

      catch (:end_of_server) {
        while (true)
          begin
            error_lock.synchronize{ last_error } and @stop_state = :forced
            @stop_state and throw(:end_of_server)
            apply_signal_stat(queue)
            socket = @accept.call
          end until (socket)

          until (queue.push(socket, @thread_queue_polling_timeout_seconds))
            error_lock.synchronize{ last_error } and @stop_state = :forced
            if (@stop_state == :forced) then
              socket.close
              @accept_return.call
              throw(:end_of_server)
            end
            apply_signal_stat(queue)
          end
          @accept_return.call
        end
      }
    ensure
      queue.close
    end

    @at_stop.call(@stop_state)
    case (@stop_state)
    when :graceful
      for thread in thread_list
        thread.join
      end
    when :forced
      for thread in thread_list
        thread.kill
      end
    else
      raise "internal error: unknown stop state <#{@stop_state.inspect}>"
    end
  ensure
    @postprocess.call
  end

  error_lock.synchronize{
    if (last_error) then
      raise last_error
    end
  }

  nil
end