Class: Cuetip::WorkerGroup

Inherits:
Object
  • Object
show all
Includes:
ActiveSupport::Callbacks
Defined in:
lib/cuetip/worker_group.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(quantity, queues) ⇒ WorkerGroup

Returns a new instance of WorkerGroup.



15
16
17
18
19
20
# File 'lib/cuetip/worker_group.rb', line 15

def initialize(quantity, queues)
  @quantity = quantity
  @queues = queues || []
  @workers = {}
  @threads = {}
end

Instance Attribute Details

#quantityObject (readonly)

Returns the value of attribute quantity.



11
12
13
# File 'lib/cuetip/worker_group.rb', line 11

def quantity
  @quantity
end

#threadsObject (readonly)

Returns the value of attribute threads.



13
14
15
# File 'lib/cuetip/worker_group.rb', line 13

def threads
  @threads
end

#workersObject (readonly)

Returns the value of attribute workers.



12
13
14
# File 'lib/cuetip/worker_group.rb', line 12

def workers
  @workers
end

Instance Method Details

#set_process_nameObject



49
50
51
52
# File 'lib/cuetip/worker_group.rb', line 49

def set_process_name
  thread_names = @workers.values.map(&:status)
  Process.setproctitle("Cuetip: #{thread_names.inspect}")
end

#startObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/cuetip/worker_group.rb', line 22

def start
  Cuetip.logger.info "Starting #{@quantity} Cuetip workers"
  if @queues.any?
    @queues.each { |q| Cuetip.logger.info "-> Joined queue: #{q.to_s}" }
  end

  exit_trap = proc do
    @workers.each { |_, worker| worker.request_exit! }
    puts 'Exiting...'
  end

  trap('INT', &exit_trap)
  trap('TERM', &exit_trap)

  @quantity.times do |i|
    @workers[i] = Worker.new(self, i, @queues)
    Cuetip.logger.info "-> Starting worker #{i}"
    @threads[i] = Thread.new(@workers[i]) do |worker|
      run_callbacks :run_worker do
        worker.run
      end
    end
    @threads[i].abort_on_exception = true
  end
  @threads.values.each(&:join)
end