Class: GRPC::RpcServer::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc/generic/rpc_server.rb

Overview

Pool is a simple thread pool for running server requests.

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ Pool

Returns a new instance of Pool.



271
272
273
274
275
276
277
278
279
# File 'lib/grpc/generic/rpc_server.rb', line 271

def initialize(size)
  fail 'pool size must be positive' unless size > 0
  @jobs = Queue.new
  @size = size
  @stopped = false
  @stop_mutex = Mutex.new
  @stop_cond = ConditionVariable.new
  @workers = []
end

Instance Method Details

#jobs_waitingObject

Returns the number of jobs waiting



282
283
284
# File 'lib/grpc/generic/rpc_server.rb', line 282

def jobs_waiting
  @jobs.size
end

#schedule(*args, &blk) ⇒ Object

Runs the given block on the queue with the provided args.

Parameters:

  • args

    the args passed blk when it is called

  • blk

    the block to call



290
291
292
293
294
295
# File 'lib/grpc/generic/rpc_server.rb', line 290

def schedule(*args, &blk)
  fail 'already stopped' if @stopped
  return if blk.nil?
  logger.info('schedule another job')
  @jobs << [blk, args]
end

#startObject

Starts running the jobs in the thread pool.



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/grpc/generic/rpc_server.rb', line 298

def start
  fail 'already stopped' if @stopped
  until @workers.size == @size.to_i
    next_thread = Thread.new do
      catch(:exit) do  # allows { throw :exit } to kill a thread
        loop do
          begin
            blk, args = @jobs.pop
            blk.call(*args)
          rescue StandardError => e
            logger.warn('Error in worker thread')
            logger.warn(e)
          end
        end
      end

      # removes the threads from workers, and signal when all the
      # threads are complete.
      @stop_mutex.synchronize do
        @workers.delete(Thread.current)
        @stop_cond.signal if @workers.size == 0
      end
    end
    @workers << next_thread
  end
end

#stopObject

Stops the jobs in the pool



326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/grpc/generic/rpc_server.rb', line 326

def stop
  logger.info('stopping, will wait for all the workers to exit')
  @workers.size.times { schedule { throw :exit } }
  @stopped = true

  # TODO: allow configuration of the keepalive period
  keep_alive = 5
  @stop_mutex.synchronize do
    @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
  end

  # Forcibly shutdown any threads that are still alive.
  if @workers.size > 0
    logger.warn("forcibly terminating #{@workers.size} worker(s)")
    @workers.each do |t|
      next unless t.alive?
      begin
        t.exit
      rescue StandardError => e
        logger.warn('error while terminating a worker')
        logger.warn(e)
      end
    end
  end

  logger.info('stopped, all workers are shutdown')
end