Class: Threaded::Master

Inherits:
Object
  • Object
show all
Defined in:
lib/threaded/master.rb

Constant Summary collapse

DEFAULT_TIMEOUT =

seconds

10
DEFAULT_SIZE =
16

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Master

Returns a new instance of Master.



7
8
9
10
11
12
13
14
# File 'lib/threaded/master.rb', line 7

def initialize(options = {})
  @queue    = Queue.new
  @mutex    = Mutex.new
  @stopping = false
  @max      = options[:size]     || DEFAULT_SIZE
  @logger   = options[:logger]   || Threaded.logger
  @workers  = []
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



5
6
7
# File 'lib/threaded/master.rb', line 5

def logger
  @logger
end

#workersObject (readonly)

Returns the value of attribute workers.



5
6
7
# File 'lib/threaded/master.rb', line 5

def workers
  @workers
end

Instance Method Details

#enqueue(job, *json) ⇒ Object

Raises:



16
17
18
19
20
21
22
# File 'lib/threaded/master.rb', line 16

def enqueue(job, *json)
  @queue.enq([job, json])

  new_worker if needs_workers? && @queue.size > 0
  raise NoWorkersError unless workers.detect {|w| w.alive? }
  return true
end

#sizeObject



42
43
44
# File 'lib/threaded/master.rb', line 42

def size
  @workers.size
end

#startObject



24
25
26
27
# File 'lib/threaded/master.rb', line 24

def start
  new_workers(@max, true)
  return self
end

#stop(timeout = DEFAULT_TIMEOUT) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/threaded/master.rb', line 29

def stop(timeout = DEFAULT_TIMEOUT)
  @mutex.synchronize do
    @stopping = true
    workers.each {|w| w.poison }
    timeout(timeout, "waiting for workers to stop") do
      while workers.any?
        workers.reject! {|w| w.join if w.dead? }
      end
    end
  end
  return self
end

#stopping?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/threaded/master.rb', line 46

def stopping?
  @stopping
end