Class: GRPC::RpcServer::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc/generic/rpc_server.rb

Overview

Pool is a simple thread pool for running server requests.

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ Pool

Returns a new instance of Pool.



278
279
280
281
282
283
284
285
286
# File 'lib/grpc/generic/rpc_server.rb', line 278

def initialize(size)
  fail 'pool size must be positive' unless size > 0
  @jobs = Queue.new
  @size = size
  @stopped = false
  @stop_mutex = Mutex.new
  @stop_cond = ConditionVariable.new
  @workers = []
end

Instance Method Details

#jobs_waitingObject

Returns the number of jobs waiting



289
290
291
# File 'lib/grpc/generic/rpc_server.rb', line 289

def jobs_waiting
  @jobs.size
end

#schedule(*args, &blk) ⇒ Object

Runs the given block on the queue with the provided args.

Parameters:

  • args

    the args passed blk when it is called

  • blk

    the block to call



297
298
299
300
301
302
# File 'lib/grpc/generic/rpc_server.rb', line 297

def schedule(*args, &blk)
  fail 'already stopped' if @stopped
  return if blk.nil?
  logger.info('schedule another job')
  @jobs << [blk, args]
end

#startObject

Starts running the jobs in the thread pool.



305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/grpc/generic/rpc_server.rb', line 305

def start
  fail 'already stopped' if @stopped
  until @workers.size == @size.to_i
    next_thread = Thread.new do
      catch(:exit) do  # allows { throw :exit } to kill a thread
        loop do
          begin
            blk, args = @jobs.pop
            blk.call(*args)
          rescue StandardError => e
            logger.warn('Error in worker thread')
            logger.warn(e)
          end
        end
      end

      # removes the threads from workers, and signal when all the
      # threads are complete.
      @stop_mutex.synchronize do
        @workers.delete(Thread.current)
        @stop_cond.signal if @workers.size == 0
      end
    end
    @workers << next_thread
  end
end

#stopObject

Stops the jobs in the pool



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/grpc/generic/rpc_server.rb', line 333

def stop
  logger.info('stopping, will wait for all the workers to exit')
  @workers.size.times { schedule { throw :exit } }
  @stopped = true

  # TODO: allow configuration of the keepalive period
  keep_alive = 5
  @stop_mutex.synchronize do
    @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
  end

  # Forcibly shutdown any threads that are still alive.
  if @workers.size > 0
    logger.warn("forcibly terminating #{@workers.size} worker(s)")
    @workers.each do |t|
      next unless t.alive?
      begin
        t.exit
      rescue StandardError => e
        logger.warn('error while terminating a worker')
        logger.warn(e)
      end
    end
  end

  logger.info('stopped, all workers are shutdown')
end