Class: Threasy::Work

Inherits:
Object
  • Object
show all
Defined in:
lib/threasy/work.rb

Defined Under Namespace

Classes: TimeoutQueue, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWork

Returns a new instance of Work.



5
6
7
8
9
# File 'lib/threasy/work.rb', line 5

def initialize
  @queue = TimeoutQueue.new
  @pool = Set.new
  @semaphore = Mutex.new
end

Instance Attribute Details

#poolObject (readonly)

Returns the value of attribute pool.



3
4
5
# File 'lib/threasy/work.rb', line 3

def pool
  @pool
end

#queueObject (readonly)

Returns the value of attribute queue.



3
4
5
# File 'lib/threasy/work.rb', line 3

def queue
  @queue
end

Instance Method Details

#add_worker(size) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/threasy/work.rb', line 40

def add_worker(size)
  # sync do
    log "Adding new worker to pool"
    worker = Worker.new(self, size)
    pool.add worker
  # end
  worker.work
end

#check_workersObject



29
30
31
32
33
34
35
36
37
38
# File 'lib/threasy/work.rb', line 29

def check_workers
  sync do
    pool_size = pool.size
    queue_size = queue.size
    log "Checking workers. Pool: #{pool_size}, Max: #{max_workers}, Queue: #{queue_size}"
    if pool_size < max_workers
      add_worker(pool_size) if pool_size == 0 || queue_size > max_workers
    end
  end
end

#clearObject



49
50
51
# File 'lib/threasy/work.rb', line 49

def clear
  queue.clear
end

#enqueue(job = nil, &block) ⇒ Object Also known as: enqueue_block



11
12
13
# File 'lib/threasy/work.rb', line 11

def enqueue(job = nil, &block)
  queue.push(block_given? ? block : job).tap{ check_workers }
end

#grabObject



21
22
23
# File 'lib/threasy/work.rb', line 21

def grab
  queue.pop
end

#log(msg) ⇒ Object



53
54
55
# File 'lib/threasy/work.rb', line 53

def log(msg)
  Threasy.logger.debug msg
end

#max_workersObject



25
26
27
# File 'lib/threasy/work.rb', line 25

def max_workers
  Threasy.config.max_workers
end

#sync(&block) ⇒ Object



17
18
19
# File 'lib/threasy/work.rb', line 17

def sync(&block)
  @semaphore.synchronize &block
end