Class: GRPC::Pool
- Inherits:
-
Object
- Object
- GRPC::Pool
- Defined in:
- lib/grpc/generic/rpc_server.rb
Overview
Pool is a simple thread pool.
Constant Summary collapse
- DEFAULT_KEEP_ALIVE =
Default keep alive period is 1s
1
Instance Method Summary collapse
-
#initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) ⇒ Pool
constructor
A new instance of Pool.
-
#jobs_waiting ⇒ Object
Returns the number of jobs waiting.
-
#schedule(*args, &blk) ⇒ Object
Runs the given block on the queue with the provided args.
-
#start ⇒ Object
Starts running the jobs in the thread pool.
-
#stop ⇒ Object
Stops the jobs in the pool.
Constructor Details
#initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) ⇒ Pool
Returns a new instance of Pool.
74 75 76 77 78 79 80 81 82 83 |
# File 'lib/grpc/generic/rpc_server.rb', line 74 def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) fail 'pool size must be positive' unless size > 0 @jobs = Queue.new @size = size @stopped = false @stop_mutex = Mutex.new @stop_cond = ConditionVariable.new @workers = [] @keep_alive = keep_alive end |
Instance Method Details
#jobs_waiting ⇒ Object
Returns the number of jobs waiting
86 87 88 |
# File 'lib/grpc/generic/rpc_server.rb', line 86 def jobs_waiting @jobs.size end |
#schedule(*args, &blk) ⇒ Object
Runs the given block on the queue with the provided args.
94 95 96 97 98 99 |
# File 'lib/grpc/generic/rpc_server.rb', line 94 def schedule(*args, &blk) fail 'already stopped' if @stopped return if blk.nil? logger.info('schedule another job') @jobs << [blk, args] end |
#start ⇒ Object
Starts running the jobs in the thread pool.
102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/grpc/generic/rpc_server.rb', line 102 def start fail 'already stopped' if @stopped until @workers.size == @size.to_i next_thread = Thread.new do catch(:exit) do # allows { throw :exit } to kill a thread loop_execute_jobs end remove_current_thread end @workers << next_thread end end |
#stop ⇒ Object
Stops the jobs in the pool
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/grpc/generic/rpc_server.rb', line 116 def stop logger.info('stopping, will wait for all the workers to exit') @workers.size.times { schedule { throw :exit } } @stopped = true @stop_mutex.synchronize do # wait @keep_alive for works to stop @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers logger.info('stopped, all workers are shutdown') end |