Class: PerfectQueue::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/engine.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(backend, log, conf) ⇒ Engine

Returns a new instance of Engine.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/perfectqueue/engine.rb', line 6

def initialize(backend, log, conf)
  @backend = backend
  @log = log

  @timeout = conf[:timeout]
  @poll_interval = conf[:poll_interval] || 1
  @expire = conf[:expire] || 345600

  num_workers = conf[:workers] || 1
  @workers = (1..num_workers).map {
    Worker.new(self, conf)
  }
  @available_workers = @workers.dup

  @finished = false
  @error = nil

  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Attribute Details

#backendObject (readonly)

Returns the value of attribute backend.



27
28
29
# File 'lib/perfectqueue/engine.rb', line 27

def backend
  @backend
end

#errorObject (readonly)

Returns the value of attribute error.



29
30
31
# File 'lib/perfectqueue/engine.rb', line 29

def error
  @error
end

#logObject (readonly)

Returns the value of attribute log.



28
29
30
# File 'lib/perfectqueue/engine.rb', line 28

def log
  @log
end

Instance Method Details

#acquire_workerObject



94
95
96
97
98
99
100
101
102
# File 'lib/perfectqueue/engine.rb', line 94

def acquire_worker
  @mutex.synchronize {
    while @available_workers.empty?
      return nil if finished?
      @cond.wait(@mutex)
    end
    return @available_workers.pop
  }
end

#finished?Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/perfectqueue/engine.rb', line 31

def finished?
  @finished
end

#release_worker(worker) ⇒ Object



104
105
106
107
108
109
110
111
# File 'lib/perfectqueue/engine.rb', line 104

def release_worker(worker)
  @mutex.synchronize {
    @available_workers.push worker
    if @available_workers.size == 1
      @cond.broadcast
    end
  }
end

#runObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/perfectqueue/engine.rb', line 35

def run
  @workers.each {|w|
    w.start
  }

  until finished?
    w = acquire_worker
    next unless w
    begin

      until finished?
        token, task = @backend.acquire(Time.now.to_i+@timeout)

        unless token
          sleep @poll_interval
          next
        end
        if task.created_at > Time.now.to_i+@expire
          @log.warn "canceling expired task id=#{task.id}"
          @backend.cancel(token)
          next
        end

        @log.info "acquired task id=#{task.id}"
        w.submit(token, task)
        w = nil
        break
      end

    ensure
      release_worker(w) if w
    end
  end
ensure
  @finished = true
end

#shutdownObject



87
88
89
90
91
92
# File 'lib/perfectqueue/engine.rb', line 87

def shutdown
  @finished = true
  @workers.each {|w|
    w.shutdown
  }
end

#stop(err = nil) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/perfectqueue/engine.rb', line 72

def stop(err=nil)
  @finished = true
  @error = error
  @workers.each {|w|
    w.stop
  }

  if err
    log.error err.to_s
    err.backtrace.each {|x|
      log.error "  #{x}"
    }
  end
end