Class: CukeForker::WorkerQueue

Inherits:
Object
  • Object
show all
Includes:
Observable
Defined in:
lib/cukeforker/worker_queue.rb

Defined Under Namespace

Classes: NotStartedError

Instance Method Summary collapse

Constructor Details

#initialize(max, delay, fail_fast = false) ⇒ WorkerQueue

Returns a new instance of WorkerQueue.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/cukeforker/worker_queue.rb', line 5

def initialize(max, delay, fail_fast=false)
  @max = max
  @delay = delay
  @fail_fast = fail_fast

  if @max < 0
    raise ArgumentError, "max workers cannot be negative, got #{@max.inspect}"
  end

  unless @delay.kind_of?(Numeric)
    raise ArgumentError, "delay must be Numeric, got #{@delay.inspect}:#{@delay.class}"
  end

  @pending = []
  @running = []
  @finished = []
end

Instance Method Details

#add(worker) ⇒ Object



27
28
29
# File 'lib/cukeforker/worker_queue.rb', line 27

def add(worker)
  @pending << worker
end

#add_observer(observer) ⇒ Object



105
106
107
108
# File 'lib/cukeforker/worker_queue.rb', line 105

def add_observer(observer)
  @pending.each { |e| e.add_observer observer }
  super
end

#backed_up?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/cukeforker/worker_queue.rb', line 23

def backed_up?
  @pending.any?
end

#empty?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/cukeforker/worker_queue.rb', line 75

def empty?
  @running.empty?
end

#etaObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/cukeforker/worker_queue.rb', line 83

def eta
  pending  = @pending.size
  finished = @finished.size
  running  = @running.size

  remaining = pending + running

  if finished == 0
    result = [Time.now, remaining, finished]
    fire :on_eta, *result
  else
    seconds_per_child = (Time.now - start_time) / finished.to_f
    eta = Time.now + (seconds_per_child * remaining)

    result = [eta, remaining, finished]

    fire :on_eta, *result
  end

  result
end

#fillObject



50
51
52
53
54
55
# File 'lib/cukeforker/worker_queue.rb', line 50

def fill
  while backed_up? and not full?
    worker = @pending.shift
    start worker
  end
end

#full?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/cukeforker/worker_queue.rb', line 71

def full?
  @max != 0 && size == @max
end

#has_failures?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/cukeforker/worker_queue.rb', line 79

def has_failures?
  @finished.any? { |w| w.failed? }
end

#poll(seconds = nil) ⇒ Object



57
58
59
60
61
62
63
64
65
# File 'lib/cukeforker/worker_queue.rb', line 57

def poll(seconds = nil)
  finished = @running.select { |w| w.finished? }

  if finished.empty?
    sleep seconds if seconds
  else
    finished.each { |w| finish w }
  end
end

#process(poll_interval = nil) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/cukeforker/worker_queue.rb', line 31

def process(poll_interval = nil)
  @start_time = Time.now

  while backed_up?
    fill
    eta
    poll poll_interval while full?
  end

  # yay, no more pending workers
end

#sizeObject



67
68
69
# File 'lib/cukeforker/worker_queue.rb', line 67

def size
  @running.size
end

#wait_until_finished(poll_interval = nil) ⇒ Object



43
44
45
46
47
48
# File 'lib/cukeforker/worker_queue.rb', line 43

def wait_until_finished(poll_interval = nil)
  until empty?
    poll poll_interval
    eta
  end
end