Class: PerfectQueue::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/engine.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(backend, log, conf) ⇒ Engine

Returns a new instance of Engine.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/perfectqueue/engine.rb', line 6

def initialize(backend, log, conf)
  @backend = backend
  @log = log

  @timeout = conf[:timeout]
  @poll_interval = conf[:poll_interval] || 1
  @expire = conf[:expire] || 345600

  num_workers = conf[:workers] || 1
  @workers = (1..num_workers).map {
    Worker.new(self, conf)
  }
  @available_workers = @workers.dup

  @finished = false
  @error = nil

  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Attribute Details

#backendObject (readonly)

Returns the value of attribute backend.



27
28
29
# File 'lib/perfectqueue/engine.rb', line 27

def backend
  @backend
end

#errorObject (readonly)

Returns the value of attribute error.



29
30
31
# File 'lib/perfectqueue/engine.rb', line 29

def error
  @error
end

#logObject (readonly)

Returns the value of attribute log.



28
29
30
# File 'lib/perfectqueue/engine.rb', line 28

def log
  @log
end

Instance Method Details

#acquire_workerObject



95
96
97
98
99
100
101
102
103
# File 'lib/perfectqueue/engine.rb', line 95

def acquire_worker
  @mutex.synchronize {
    while @available_workers.empty?
      return nil if finished?
      @cond.wait(@mutex)
    end
    return @available_workers.pop
  }
end

#finished?Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/perfectqueue/engine.rb', line 31

def finished?
  @finished
end

#release_worker(worker) ⇒ Object



105
106
107
108
109
110
111
112
# File 'lib/perfectqueue/engine.rb', line 105

def release_worker(worker)
  @mutex.synchronize {
    @available_workers.push worker
    if @available_workers.size == 1
      @cond.broadcast
    end
  }
end

#runObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/perfectqueue/engine.rb', line 35

def run
  @workers.each {|w|
    w.start
  }

  until finished?
    w = acquire_worker
    next unless w
    begin

      until finished?
        now = Time.now.to_i
        token, task = @backend.acquire(now+@timeout)

        unless token
          sleep @poll_interval
          next
        end
        if task.created_at < now-@expire
          @log.warn "canceling expired task id=#{task.id}"
          @backend.cancel(token)
          next
        end

        @log.info "acquired task id=#{task.id}"
        w.submit(token, task)
        w = nil
        break
      end

    ensure
      release_worker(w) if w
    end
  end
ensure
  @finished = true
end

#shutdownObject



88
89
90
91
92
93
# File 'lib/perfectqueue/engine.rb', line 88

def shutdown
  @finished = true
  @workers.each {|w|
    w.shutdown
  }
end

#stop(err = nil) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/perfectqueue/engine.rb', line 73

def stop(err=nil)
  @finished = true
  @error = error
  @workers.each {|w|
    w.stop
  }

  if err
    @log.error "#{err.class}: #{err}"
    err.backtrace.each {|x|
      @log.error "  #{x}"
    }
  end
end