Class: PerfectQueue::RDBBackend

Inherits:
Backend
  • Object
show all
Defined in:
lib/perfectqueue/backend/rdb.rb

Constant Summary collapse

MAX_SELECT_ROW =
32

Instance Method Summary collapse

Methods inherited from Backend

#close

Constructor Details

#initialize(uri, table) ⇒ RDBBackend

Returns a new instance of RDBBackend.



6
7
8
9
10
11
12
# File 'lib/perfectqueue/backend/rdb.rb', line 6

def initialize(uri, table)
  require 'sequel'
  @uri = uri
  @table = table
  @db = Sequel.connect(@uri)
  init_db(@uri.split(':',2)[0])
end

Instance Method Details

#acquire(timeout, now = Time.now.to_i) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/perfectqueue/backend/rdb.rb', line 58

def acquire(timeout, now=Time.now.to_i)
  connect {
    while true
      rows = 0
      @db.fetch("SELECT id, timeout, data, created_at FROM `#{@table}` WHERE timeout <= ? ORDER BY timeout ASC LIMIT #{MAX_SELECT_ROW};", now) {|row|

        unless row[:created_at]
          # finished/canceled task
          @db["DELETE FROM `#{@table}` WHERE id=?;", row[:id]].delete

        else
          n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND timeout=?;", timeout, row[:id], row[:timeout]].update
          if n > 0
            return row[:id], Task.new(row[:id], row[:created_at], row[:data])
          end
        end

        rows += 1
      }
      if rows < MAX_SELECT_ROW
        return nil
      end
    end
  }
end

#cancel(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object



101
102
103
# File 'lib/perfectqueue/backend/rdb.rb', line 101

def cancel(id, delete_timeout=3600, now=Time.now.to_i)
  finish(id, delete_timeout, now)
end

#finish(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object



84
85
86
87
88
89
# File 'lib/perfectqueue/backend/rdb.rb', line 84

def finish(id, delete_timeout=3600, now=Time.now.to_i)
  connect {
    n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout, id].update
    return n > 0
  }
end

#list(&block) ⇒ Object



50
51
52
53
54
# File 'lib/perfectqueue/backend/rdb.rb', line 50

def list(&block)
  @db.fetch("SELECT id, timeout, data, created_at FROM `#{@table}` WHERE created_at IS NOT NULL ORDER BY timeout ASC;") {|row|
    yield row[:id], row[:created_at], row[:data], row[:timeout]
  }
end

#submit(id, data, time = Time.now.to_i) ⇒ Object



105
106
107
108
109
110
111
112
113
114
# File 'lib/perfectqueue/backend/rdb.rb', line 105

def submit(id, data, time=Time.now.to_i)
  connect {
    begin
      n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at) VALUES (?, ?, ?, ?);", id, time, data, time].insert
      return true
    rescue Sequel::DatabaseError
      return nil
    end
  }
end

#update(id, timeout) ⇒ Object



91
92
93
94
95
96
97
98
99
# File 'lib/perfectqueue/backend/rdb.rb', line 91

def update(id, timeout)
  connect {
    n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND created_at IS NOT NULL;", timeout, id].update
    if n <= 0
      raise CanceledError, "Task id=#{id} is canceled."
    end
    return nil
  }
end