Class: DatTCP::WorkerPool
- Inherits:
-
Object
- Object
- DatTCP::WorkerPool
- Defined in:
- lib/dat-tcp/worker_pool.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
Instance Method Summary collapse
- #connections ⇒ Object
-
#despawn_worker(worker) ⇒ Object
public, because workers need to call it for themselves.
-
#enqueue_connection(socket) ⇒ Object
Check if all workers are busy before adding the connection.
-
#initialize(min = 0, max = 1, debug = false, &serve_proc) ⇒ WorkerPool
constructor
A new instance of WorkerPool.
-
#shutdown(timeout) ⇒ Object
Shutdown each worker and then the queue.
- #waiting ⇒ Object
Constructor Details
#initialize(min = 0, max = 1, debug = false, &serve_proc) ⇒ WorkerPool
Returns a new instance of WorkerPool.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/dat-tcp/worker_pool.rb', line 13 def initialize(min = 0, max = 1, debug = false, &serve_proc) @min_workers = min @max_workers = max @debug = debug @logger = DatTCP::Logger.new(@debug) @serve_proc = serve_proc @queue = DatTCP::Queue.new @workers_waiting = DatTCP::WorkersWaiting.new @mutex = Mutex.new @workers = [] @spawned = 0 @min_workers.times{ self.spawn_worker } end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
11 12 13 |
# File 'lib/dat-tcp/worker_pool.rb', line 11 def logger @logger end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
11 12 13 |
# File 'lib/dat-tcp/worker_pool.rb', line 11 def spawned @spawned end |
Instance Method Details
#connections ⇒ Object
30 31 32 |
# File 'lib/dat-tcp/worker_pool.rb', line 30 def connections @queue.items end |
#despawn_worker(worker) ⇒ Object
public, because workers need to call it for themselves
75 76 77 78 79 80 |
# File 'lib/dat-tcp/worker_pool.rb', line 75 def despawn_worker(worker) @mutex.synchronize do @spawned -= 1 @workers.delete worker end end |
#enqueue_connection(socket) ⇒ Object
Check if all workers are busy before adding the connection. When the connection is added, a worker will stop waiting (if it was idle). Because of that, we can’t reliably check if all workers are busy. We might think all workers are busy because we just woke up a sleeping worker to serve this connection. Then we would spawn a worker to do nothing.
43 44 45 46 47 48 |
# File 'lib/dat-tcp/worker_pool.rb', line 43 def enqueue_connection(socket) return if !socket new_worker_needed = all_workers_are_busy? @queue.push socket self.spawn_worker if new_worker_needed && havent_reached_max_workers? end |
#shutdown(timeout) ⇒ Object
Shutdown each worker and then the queue. Shutting down the queue will signal any workers waiting on it to wake up, so they can start shutting down. If a worker is processing a connection, then it will be joined and allowed to finish. NOTE Any connections that are on the queue are not served.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/dat-tcp/worker_pool.rb', line 55 def shutdown(timeout) begin SystemTimer.timeout(timeout, DatTCP::TimeoutError) do @workers.each(&:shutdown) @queue.shutdown # use this pattern instead of `each` -- we don't want to call `join` on # every worker (especially if they are shutting down on their own), we # just want to make sure that any who haven't had a chance to finish # get to (this is safe, otherwise you might get a dead thread in the # `each`). @workers.first.join until @workers.empty? end rescue DatTCP::TimeoutError => exception exception..replace "Timed out shutting down the server" @debug ? raise(exception) : self.logger.error(exception.) end end |
#waiting ⇒ Object
34 35 36 |
# File 'lib/dat-tcp/worker_pool.rb', line 34 def waiting @workers_waiting.count end |