Class: CukeForker::WorkerQueue

Inherits:
Object
  • Object
show all
Includes:
Observable
Defined in:
lib/cukeforker/worker_queue.rb

Defined Under Namespace

Classes: NotStartedError

Instance Method Summary collapse

Constructor Details

#initialize(max) ⇒ WorkerQueue



5
6
7
8
9
10
11
12
13
14
15
# File 'lib/cukeforker/worker_queue.rb', line 5

def initialize(max)
  @max = max

  if @max < 0
    raise ArgumentError, "max workers cannot be negative, got #{@max.inspect}"
  end

  @pending = []
  @running = []
  @finished = []
end

Instance Method Details

#add(worker) ⇒ Object



21
22
23
# File 'lib/cukeforker/worker_queue.rb', line 21

def add(worker)
  @pending << worker
end

#add_observer(observer) ⇒ Object



99
100
101
102
# File 'lib/cukeforker/worker_queue.rb', line 99

def add_observer(observer)
  @pending.each { |e| e.add_observer observer }
  super
end

#backed_up?Boolean



17
18
19
# File 'lib/cukeforker/worker_queue.rb', line 17

def backed_up?
  @pending.any?
end

#empty?Boolean



69
70
71
# File 'lib/cukeforker/worker_queue.rb', line 69

def empty?
  @running.empty?
end

#etaObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/cukeforker/worker_queue.rb', line 77

def eta
  pending  = @pending.size
  finished = @finished.size
  running  = @running.size

  remaining = pending + running

  if finished == 0
    result = [Time.now, remaining, finished]
    fire :on_eta, *result
  else
    seconds_per_child = (Time.now - start_time) / finished.to_f
    eta = Time.now + (seconds_per_child * remaining)

    result = [eta, remaining, finished]

    fire :on_eta, *result
  end

  result
end

#fillObject



44
45
46
47
48
49
# File 'lib/cukeforker/worker_queue.rb', line 44

def fill
  while backed_up? and not full?
    worker = @pending.shift
    start worker
  end
end

#full?Boolean



65
66
67
# File 'lib/cukeforker/worker_queue.rb', line 65

def full?
  @max != 0 && size == @max
end

#has_failures?Boolean



73
74
75
# File 'lib/cukeforker/worker_queue.rb', line 73

def has_failures?
  @finished.any? { |w| w.failed? }
end

#poll(seconds = nil) ⇒ Object



51
52
53
54
55
56
57
58
59
# File 'lib/cukeforker/worker_queue.rb', line 51

def poll(seconds = nil)
  finished = @running.select { |w| w.finished? }

  if finished.empty?
    sleep seconds if seconds
  else
    finished.each { |w| finish w }
  end
end

#process(poll_interval = nil) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/cukeforker/worker_queue.rb', line 25

def process(poll_interval = nil)
  @start_time = Time.now

  while backed_up?
    fill
    eta
    poll poll_interval while full?
  end

  # yay, no more pending workers
end

#sizeObject



61
62
63
# File 'lib/cukeforker/worker_queue.rb', line 61

def size
  @running.size
end

#wait_until_finished(poll_interval = nil) ⇒ Object



37
38
39
40
41
42
# File 'lib/cukeforker/worker_queue.rb', line 37

def wait_until_finished(poll_interval = nil)
  until empty?
    poll poll_interval
    eta
  end
end