Class: NatsWork::Server::WorkerManager

Inherits:
Object
  • Object
show all
Defined in:
lib/natswork/worker_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency: 5) ⇒ WorkerManager

Returns a new instance of WorkerManager.



11
12
13
14
15
16
17
# File 'lib/natswork/worker_manager.rb', line 11

def initialize(concurrency: 5)
  @concurrency = concurrency
  @workers = []
  @running = false
  @pool = Concurrent::FixedThreadPool.new(concurrency)
  @shutdown_mutex = Mutex.new
end

Instance Attribute Details

#poolObject (readonly)

Returns the value of attribute pool.



9
10
11
# File 'lib/natswork/worker_manager.rb', line 9

def pool
  @pool
end

#runningObject (readonly)

Returns the value of attribute running.



9
10
11
# File 'lib/natswork/worker_manager.rb', line 9

def running
  @running
end

#workersObject (readonly)

Returns the value of attribute workers.



9
10
11
# File 'lib/natswork/worker_manager.rb', line 9

def workers
  @workers
end

Instance Method Details

#running?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/natswork/worker_manager.rb', line 47

def running?
  @running
end

#startObject



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/natswork/worker_manager.rb', line 19

def start
  @shutdown_mutex.synchronize do
    return if @running

    @running = true

    @concurrency.times do |i|
      worker = SimpleWorker.new(id: "worker-#{i + 1}", pool: @pool)
      @workers << worker
      worker.start
    end
  end
end

#statsObject



51
52
53
54
55
56
57
58
59
# File 'lib/natswork/worker_manager.rb', line 51

def stats
  {
    running: @running,
    workers: @workers.size,
    pool_size: @pool.length,
    queue_length: @pool.queue_length,
    completed_tasks: @pool.completed_task_count
  }
end

#stopObject



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/natswork/worker_manager.rb', line 33

def stop
  @shutdown_mutex.synchronize do
    return unless @running

    @running = false

    NatsWork.logger.info 'Stopping workers...'
    @workers.each(&:stop)
    @pool.shutdown
    @pool.wait_for_termination(10)
    @workers.clear
  end
end