Class: BackgroundQueue::ServerLib::WorkerBalancer

Inherits:
Object
  • Object
show all
Defined in:
lib/background_queue/server_lib/worker_balancer.rb

Overview

make sure each worker gets its fair share of tasks track the number of active connections to use as the balancing metric

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server) ⇒ WorkerBalancer

Returns a new instance of WorkerBalancer.



9
10
11
12
13
14
15
16
17
18
# File 'lib/background_queue/server_lib/worker_balancer.rb', line 9

def initialize(server)
  @server = server
  @mutex = Mutex.new
  @offline_workers = []
  @available_workers = SortedWorkers.new
  for worker_config in server.config.workers.reverse
    worker = Worker.new(worker_config.uri)
    @available_workers.add_worker(worker)
  end
end

Instance Attribute Details

#available_workersObject (readonly)

Returns the value of attribute available_workers.



6
7
8
# File 'lib/background_queue/server_lib/worker_balancer.rb', line 6

def available_workers
  @available_workers
end

#offline_workersObject (readonly)

Returns the value of attribute offline_workers.



7
8
9
# File 'lib/background_queue/server_lib/worker_balancer.rb', line 7

def offline_workers
  @offline_workers
end

Instance Method Details

#check_offlineObject

poll the workers that are marked as offline, and mark them online if the polling succeeded



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/background_queue/server_lib/worker_balancer.rb', line 21

def check_offline
  
  workers_to_check = @mutex.synchronize { @offline_workers.clone }

  for worker in workers_to_check
    client = BackgroundQueue::ServerLib::WorkerClient.new(@server)
    if client.send_request(worker, build_poll_task, @server.config.secret)
      register_online(worker)
    end
  end
end

#finish_using_worker(worker, online) ⇒ Object



44
45
46
47
48
49
50
51
52
# File 'lib/background_queue/server_lib/worker_balancer.rb', line 44

def finish_using_worker(worker, online)
  @mutex.synchronize { 
    if online
      register_finish(worker)
    else
      register_offline(worker)
    end
  }
end

#get_next_workerObject

get the worker with the least number of connections using it



34
35
36
37
38
39
40
41
42
# File 'lib/background_queue/server_lib/worker_balancer.rb', line 34

def get_next_worker
  @mutex.synchronize { 
    worker = @available_workers.worker_list.first 
    unless worker.nil?
      register_start(worker)
    end
    worker
  }
end