Class: PerfectQueue::RDBBackend
- Defined in:
- lib/perfectqueue/backend/rdb.rb
Constant Summary collapse
- MAX_SELECT_ROW =
8
- MAX_RETRY =
KEEPALIVE = 10
10
Instance Method Summary collapse
- #acquire(timeout, now = Time.now.to_i) ⇒ Object
- #cancel(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object
- #create_tables ⇒ Object
- #finish(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object
-
#initialize(uri, table, config = {}) ⇒ RDBBackend
constructor
A new instance of RDBBackend.
- #list(&block) ⇒ Object
- #submit(id, data, time = Time.now.to_i, resource = nil, max_running = nil) ⇒ Object
- #update(id, timeout) ⇒ Object
Methods inherited from Backend
Constructor Details
#initialize(uri, table, config = {}) ⇒ RDBBackend
Returns a new instance of RDBBackend.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/perfectqueue/backend/rdb.rb', line 6 def initialize(uri, table, config={}) require 'sequel' require 'uri' @uri = uri @table = table u = URI.parse(@uri) = { max_connections: 1, user: u.user, password: u.password, host: u.host, port: u.port ? u.port.to_i : 3306 } [:sslca] = config[:sslca] if config[:sslca] db_name = @uri.path.split('/')[1] @db = Sequel.mysql2(db_name, ) #@last_time = Time.now.to_i @mutex = Mutex.new #init_db(@uri.split('//',2)[0]) connect { # connection test } @sql = <<SQL SELECT id, timeout, data, created_at, resource, max_running/running AS weight FROM `#{@table}` LEFT JOIN ( SELECT resource AS res, COUNT(1) AS running FROM `#{@table}` AS T WHERE timeout > ? AND created_at IS NOT NULL AND resource IS NOT NULL GROUP BY resource ) AS R ON resource = res WHERE timeout <= ? AND (max_running-running IS NULL OR max_running-running > 0) ORDER BY weight IS NOT NULL, weight DESC, timeout ASC LIMIT #{MAX_SELECT_ROW} SQL # sqlite doesn't support SELECT ... FOR UPDATE but # sqlite doesn't need it because the db is not shared unless @uri.split('//',2)[0].to_s.include?('sqlite') @sql << 'FOR UPDATE' end end |
Instance Method Details
#acquire(timeout, now = Time.now.to_i) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/perfectqueue/backend/rdb.rb', line 110 def acquire(timeout, now=Time.now.to_i) connect { while true rows = 0 @db.transaction do @db.fetch(@sql, now, now) {|row| unless row[:created_at] # finished/canceled task @db["DELETE FROM `#{@table}` WHERE id=?;", row[:id]].delete else ## optimistic lock is not needed because the row is locked for update #n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND timeout=?", timeout, row[:id], row[:timeout]].update n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=?", timeout, row[:id]].update if n > 0 return row[:id], Task.new(row[:id], row[:created_at], row[:data], row[:resource]) end end rows += 1 } end break nil if rows < MAX_SELECT_ROW end } end |
#cancel(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object
154 155 156 |
# File 'lib/perfectqueue/backend/rdb.rb', line 154 def cancel(id, delete_timeout=3600, now=Time.now.to_i) finish(id, delete_timeout, now) end |
#create_tables ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/perfectqueue/backend/rdb.rb', line 51 def create_tables sql = '' sql << "CREATE TABLE IF NOT EXISTS `#{@table}` (" sql << " id VARCHAR(256) NOT NULL," sql << " timeout INT NOT NULL," sql << " data BLOB NOT NULL," sql << " created_at INT," sql << " resource VARCHAR(256)," sql << " max_running INT," sql << " PRIMARY KEY (id)" sql << ");" # TODO index connect { @db.run sql } end |
#finish(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object
137 138 139 140 141 142 |
# File 'lib/perfectqueue/backend/rdb.rb', line 137 def finish(id, delete_timeout=3600, now=Time.now.to_i) connect { n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout, id].update return n > 0 } end |
#list(&block) ⇒ Object
100 101 102 103 104 |
# File 'lib/perfectqueue/backend/rdb.rb', line 100 def list(&block) @db.fetch("SELECT id, timeout, data, created_at, resource FROM `#{@table}` WHERE created_at IS NOT NULL ORDER BY timeout ASC;") {|row| yield row[:id], row[:created_at], row[:data], row[:timeout], row[:resource] } end |
#submit(id, data, time = Time.now.to_i, resource = nil, max_running = nil) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/perfectqueue/backend/rdb.rb', line 158 def submit(id, data, time=Time.now.to_i, resource=nil, max_running=nil) connect { begin data = Sequel::SQL::Blob.new(data) n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at, resource, max_running) VALUES (?, ?, ?, ?, ?, ?);", id, time, data, time, resource, max_running].insert return true rescue Sequel::DatabaseError => e # Sequel doesn't provide error classes to distinguish duplicate-entry from other # errors like connectivity error. This code assumes the driver is mysql2 and # the error message is "Mysql::ServerError::DupEntry: Duplicate entry" if /: Duplicate entry/ =~ e.to_s return nil end raise e end } end |
#update(id, timeout) ⇒ Object
144 145 146 147 148 149 150 151 152 |
# File 'lib/perfectqueue/backend/rdb.rb', line 144 def update(id, timeout) connect { n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND created_at IS NOT NULL;", timeout, id].update if n <= 0 raise CanceledError, "Task id=#{id} is canceled." end return nil } end |