Class: PerfectQueue::RDBBackend

Inherits:
Backend
  • Object
show all
Defined in:
lib/perfectqueue/backend/rdb.rb

Constant Summary collapse

MAX_SELECT_ROW =
8
MAX_RETRY =

KEEPALIVE = 10

10

Instance Method Summary collapse

Methods inherited from Backend

#close

Constructor Details

#initialize(uri, table, config = {}) ⇒ RDBBackend

Returns a new instance of RDBBackend.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/perfectqueue/backend/rdb.rb', line 6

def initialize(uri, table, config={})
  require 'sequel'
  require 'uri'

  @uri = uri
  @table = table

  u = URI.parse(@uri)
  options = {
    max_connections: 1,
    user: u.user,
    password: u.password,
    host: u.host,
    port: u.port ? u.port.to_i : 3306
  }
  options[:sslca] = config[:sslca] if config[:sslca]
  db_name = @uri.path.split('/')[1]
  @db = Sequel.mysql2(db_name, options)

  #@last_time = Time.now.to_i
  @mutex = Mutex.new
  #init_db(@uri.split('//',2)[0])
  connect {
    # connection test
  }
  @sql = <<SQL
SELECT id, timeout, data, created_at, resource, max_running/running AS weight
FROM `#{@table}`
LEFT JOIN (
SELECT resource AS res, COUNT(1) AS running
FROM `#{@table}` AS T
WHERE timeout > ? AND created_at IS NOT NULL AND resource IS NOT NULL
GROUP BY resource
) AS R ON resource = res
WHERE timeout <= ? AND (max_running-running IS NULL OR max_running-running > 0)
ORDER BY weight IS NOT NULL, weight DESC, timeout ASC
LIMIT #{MAX_SELECT_ROW}
SQL
  # sqlite doesn't support SELECT ... FOR UPDATE but
  # sqlite doesn't need it because the db is not shared
  unless @uri.split('//',2)[0].to_s.include?('sqlite')
    @sql << 'FOR UPDATE'
  end
end

Instance Method Details

#acquire(timeout, now = Time.now.to_i) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/perfectqueue/backend/rdb.rb', line 110

def acquire(timeout, now=Time.now.to_i)
  connect {
    while true
      rows = 0
      @db.transaction do
        @db.fetch(@sql, now, now) {|row|
          unless row[:created_at]
            # finished/canceled task
            @db["DELETE FROM `#{@table}` WHERE id=?;", row[:id]].delete

          else
            ## optimistic lock is not needed because the row is locked for update
            #n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND timeout=?", timeout, row[:id], row[:timeout]].update
            n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=?", timeout, row[:id]].update
            if n > 0
              return row[:id], Task.new(row[:id], row[:created_at], row[:data], row[:resource])
            end
          end

          rows += 1
        }
      end
      break nil if rows < MAX_SELECT_ROW
    end
  }
end

#cancel(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object



154
155
156
# File 'lib/perfectqueue/backend/rdb.rb', line 154

def cancel(id, delete_timeout=3600, now=Time.now.to_i)
  finish(id, delete_timeout, now)
end

#create_tablesObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/perfectqueue/backend/rdb.rb', line 51

def create_tables
  sql = ''
  sql << "CREATE TABLE IF NOT EXISTS `#{@table}` ("
  sql << "  id VARCHAR(256) NOT NULL,"
  sql << "  timeout INT NOT NULL,"
  sql << "  data BLOB NOT NULL,"
  sql << "  created_at INT,"
  sql << "  resource VARCHAR(256),"
  sql << "  max_running INT,"
  sql << "  PRIMARY KEY (id)"
  sql << ");"
  # TODO index
  connect {
    @db.run sql
  }
end

#finish(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object



137
138
139
140
141
142
# File 'lib/perfectqueue/backend/rdb.rb', line 137

def finish(id, delete_timeout=3600, now=Time.now.to_i)
  connect {
    n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout, id].update
    return n > 0
  }
end

#list(&block) ⇒ Object



100
101
102
103
104
# File 'lib/perfectqueue/backend/rdb.rb', line 100

def list(&block)
  @db.fetch("SELECT id, timeout, data, created_at, resource FROM `#{@table}` WHERE created_at IS NOT NULL ORDER BY timeout ASC;") {|row|
    yield row[:id], row[:created_at], row[:data], row[:timeout], row[:resource]
  }
end

#submit(id, data, time = Time.now.to_i, resource = nil, max_running = nil) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/perfectqueue/backend/rdb.rb', line 158

def submit(id, data, time=Time.now.to_i, resource=nil, max_running=nil)
  connect {
    begin
      data = Sequel::SQL::Blob.new(data)
      n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at, resource, max_running) VALUES (?, ?, ?, ?, ?, ?);", id, time, data, time, resource, max_running].insert
      return true
    rescue Sequel::DatabaseError => e
      # Sequel doesn't provide error classes to distinguish duplicate-entry from other
      # errors like connectivity error. This code assumes the driver is mysql2 and
      # the error message is "Mysql::ServerError::DupEntry: Duplicate entry"
      if /: Duplicate entry/ =~ e.to_s
        return nil
      end
      raise e
    end
  }
end

#update(id, timeout) ⇒ Object



144
145
146
147
148
149
150
151
152
# File 'lib/perfectqueue/backend/rdb.rb', line 144

def update(id, timeout)
  connect {
    n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND created_at IS NOT NULL;", timeout, id].update
    if n <= 0
      raise CanceledError, "Task id=#{id} is canceled."
    end
    return nil
  }
end