Class: PerfectQueue::RDBBackend
- Defined in:
- lib/perfectqueue/backend/rdb.rb
Constant Summary collapse
- MAX_SELECT_ROW =
32
Instance Method Summary collapse
- #acquire(timeout, now = Time.now.to_i) ⇒ Object
- #cancel(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object
- #finish(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object
-
#initialize(uri, table) ⇒ RDBBackend
constructor
A new instance of RDBBackend.
- #list(&block) ⇒ Object
- #submit(id, data, time = Time.now.to_i) ⇒ Object
- #update(id, timeout) ⇒ Object
Methods inherited from Backend
Constructor Details
#initialize(uri, table) ⇒ RDBBackend
Returns a new instance of RDBBackend.
6 7 8 9 10 11 12 |
# File 'lib/perfectqueue/backend/rdb.rb', line 6 def initialize(uri, table) require 'sequel' @uri = uri @table = table @db = Sequel.connect(@uri) init_db(@uri.split(':',2)[0]) end |
Instance Method Details
#acquire(timeout, now = Time.now.to_i) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/perfectqueue/backend/rdb.rb', line 58 def acquire(timeout, now=Time.now.to_i) connect { while true rows = 0 @db.fetch("SELECT id, timeout, data, created_at FROM `#{@table}` WHERE timeout <= ? ORDER BY timeout ASC LIMIT #{MAX_SELECT_ROW};", now) {|row| unless row[:created_at] # finished/canceled task @db["DELETE FROM `#{@table}` WHERE id=?;", row[:id]].delete else n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND timeout=?;", timeout, row[:id], row[:timeout]].update if n > 0 return row[:id], Task.new(row[:id], row[:created_at], row[:data]) end end rows += 1 } if rows < MAX_SELECT_ROW return nil end end } end |
#cancel(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object
101 102 103 |
# File 'lib/perfectqueue/backend/rdb.rb', line 101 def cancel(id, delete_timeout=3600, now=Time.now.to_i) finish(id, delete_timeout, now) end |
#finish(id, delete_timeout = 3600, now = Time.now.to_i) ⇒ Object
84 85 86 87 88 89 |
# File 'lib/perfectqueue/backend/rdb.rb', line 84 def finish(id, delete_timeout=3600, now=Time.now.to_i) connect { n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout, id].update return n > 0 } end |
#list(&block) ⇒ Object
50 51 52 53 54 |
# File 'lib/perfectqueue/backend/rdb.rb', line 50 def list(&block) @db.fetch("SELECT id, timeout, data, created_at FROM `#{@table}` WHERE created_at IS NOT NULL ORDER BY timeout ASC;") {|row| yield row[:id], row[:created_at], row[:data], row[:timeout] } end |
#submit(id, data, time = Time.now.to_i) ⇒ Object
105 106 107 108 109 110 111 112 113 114 |
# File 'lib/perfectqueue/backend/rdb.rb', line 105 def submit(id, data, time=Time.now.to_i) connect { begin n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at) VALUES (?, ?, ?, ?);", id, time, data, time].insert return true rescue Sequel::DatabaseError return nil end } end |
#update(id, timeout) ⇒ Object
91 92 93 94 95 96 97 98 99 |
# File 'lib/perfectqueue/backend/rdb.rb', line 91 def update(id, timeout) connect { n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND created_at IS NOT NULL;", timeout, id].update if n <= 0 raise CanceledError, "Task id=#{id} is canceled." end return nil } end |