Class: DatTCP::WorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/dat-tcp/worker_pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min = 0, max = 1, debug = false, &serve_proc) ⇒ WorkerPool

Returns a new instance of WorkerPool.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/dat-tcp/worker_pool.rb', line 13

def initialize(min = 0, max = 1, debug = false, &serve_proc)
  @min_workers = min
  @max_workers = max
  @debug       = debug
  @logger      = DatTCP::Logger.new(@debug)
  @serve_proc  = serve_proc

  @queue           = DatTCP::Queue.new
  @workers_waiting = DatTCP::WorkersWaiting.new

  @mutex   = Mutex.new
  @workers = []
  @spawned = 0

  @min_workers.times{ self.spawn_worker }
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



11
12
13
# File 'lib/dat-tcp/worker_pool.rb', line 11

def logger
  @logger
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



11
12
13
# File 'lib/dat-tcp/worker_pool.rb', line 11

def spawned
  @spawned
end

Instance Method Details

#connectionsObject



30
31
32
# File 'lib/dat-tcp/worker_pool.rb', line 30

def connections
  @queue.items
end

#despawn_worker(worker) ⇒ Object

public, because workers need to call it for themselves



75
76
77
78
79
80
# File 'lib/dat-tcp/worker_pool.rb', line 75

def despawn_worker(worker)
  @mutex.synchronize do
    @spawned -= 1
    @workers.delete worker
  end
end

#enqueue_connection(socket) ⇒ Object

Check if all workers are busy before adding the connection. When the connection is added, a worker will stop waiting (if it was idle). Because of that, we can’t reliably check if all workers are busy. We might think all workers are busy because we just woke up a sleeping worker to serve this connection. Then we would spawn a worker to do nothing.



43
44
45
46
47
48
# File 'lib/dat-tcp/worker_pool.rb', line 43

def enqueue_connection(socket)
  return if !socket
  new_worker_needed = all_workers_are_busy?
  @queue.push socket
  self.spawn_worker if new_worker_needed && havent_reached_max_workers?
end

#shutdown(timeout) ⇒ Object

Shutdown each worker and then the queue. Shutting down the queue will signal any workers waiting on it to wake up, so they can start shutting down. If a worker is processing a connection, then it will be joined and allowed to finish. NOTE Any connections that are on the queue are not served.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/dat-tcp/worker_pool.rb', line 55

def shutdown(timeout)
  begin
    SystemTimer.timeout(timeout, DatTCP::TimeoutError) do
      @workers.each(&:shutdown)
      @queue.shutdown

      # use this pattern instead of `each` -- we don't want to call `join` on
      # every worker (especially if they are shutting down on their own), we
      # just want to make sure that any who haven't had a chance to finish
      # get to (this is safe, otherwise you might get a dead thread in the
      # `each`).
      @workers.first.join until @workers.empty?
    end
  rescue DatTCP::TimeoutError => exception
    exception.message.replace "Timed out shutting down the server"
    @debug ? raise(exception) : self.logger.error(exception.message)
  end
end

#waitingObject



34
35
36
# File 'lib/dat-tcp/worker_pool.rb', line 34

def waiting
  @workers_waiting.count
end