Class: ThreadedInMemoryQueue::Master

Inherits:
Object
  • Object
show all
Includes:
Timeout
Defined in:
lib/threaded_in_memory_queue/master.rb

Constant Summary collapse

DEFAULT_TIMEOUT =

seconds, 1 minute

60
DEFAULT_SIZE =
16

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Timeout

#timeout

Constructor Details

#initialize(options = {}) ⇒ Master

Returns a new instance of Master.



9
10
11
12
13
14
15
# File 'lib/threaded_in_memory_queue/master.rb', line 9

def initialize(options = {})
  @queue   = Queue.new
  @size    = options[:size]    || DEFAULT_SIZE
  @timeout = options[:timeout] || DEFAULT_TIMEOUT
  @logger  = options[:logger]  || ThreadedInMemoryQueue.logger
  @workers = []
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



4
5
6
# File 'lib/threaded_in_memory_queue/master.rb', line 4

def logger
  @logger
end

#workersObject (readonly)

Returns the value of attribute workers.



4
5
6
# File 'lib/threaded_in_memory_queue/master.rb', line 4

def workers
  @workers
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


50
51
52
53
# File 'lib/threaded_in_memory_queue/master.rb', line 50

def alive?
  return false if workers.empty?
  workers.detect {|w| w.alive? }
end

#enqueue(job, *json) ⇒ Object

Raises:



44
45
46
47
48
# File 'lib/threaded_in_memory_queue/master.rb', line 44

def enqueue(job, *json)
  raise NoWorkersError unless alive?
  @queue.enq([job, json])
  return true
end

#joinObject



23
24
25
26
# File 'lib/threaded_in_memory_queue/master.rb', line 23

def join
  workers.each {|w| w.join }
  return self
end

#poisonObject



28
29
30
31
# File 'lib/threaded_in_memory_queue/master.rb', line 28

def poison
  workers.each {|w| w.poison }
  return self
end

#startObject



17
18
19
20
21
# File 'lib/threaded_in_memory_queue/master.rb', line 17

def start
  return self if alive?
  @size.times { @workers << Worker.new(@queue, timeout: @timeout).start }
  return self
end

#stop(timeout = 10) ⇒ Object



33
34
35
36
37
38
39
40
41
42
# File 'lib/threaded_in_memory_queue/master.rb', line 33

def stop(timeout = 10)
  poison
  timeout(timeout, "waiting for workers to stop") do
    while self.alive?
      sleep 0.1
    end
    self.join
  end
  return self
end