Class: PerfectQueue::Backend::RDBBackend
- Inherits:
-
Object
- Object
- PerfectQueue::Backend::RDBBackend
- Defined in:
- lib/perfectqueue/backend/rdb.rb
Defined Under Namespace
Classes: Token
Constant Summary collapse
- MAX_RETRY =
::PerfectQueue::Backend::RDBCompatBackend::MAX_RETRY
- DELETE_OFFSET =
::PerfectQueue::Backend::RDBCompatBackend::DELETE_OFFSET
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Instance Method Summary collapse
- #cancel(id, delete_timeout = 3600, now = Process.clock_gettime(Process::CLOCK_REALTIME, :second)) ⇒ Object
-
#initialize(uri, table, config = {}) ⇒ RDBBackend
constructor
A new instance of RDBBackend.
- #submit(id, data, time = Process.clock_gettime(Process::CLOCK_REALTIME, :second), resource = nil, max_running = nil) ⇒ Object
Constructor Details
#initialize(uri, table, config = {}) ⇒ RDBBackend
Returns a new instance of RDBBackend.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/perfectqueue/backend/rdb.rb', line 12 def initialize(uri, table, config={}) @uri = uri @table = table u = URI.parse(@uri) = { max_connections: 1, user: u.user, password: u.password, host: u.host, port: u.port ? u.port.to_i : 3306 } @pq_connect_timeout = config.fetch(:pq_connect_timeout, 20) [:connect_timeout] = config.fetch(:connect_timeout, 3) [:sslca] = config[:sslca] if config[:sslca] db_name = u.path.split('/')[1] @db = Sequel.mysql2(db_name, ) @mutex = Mutex.new connect { # connection test } end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
36 37 38 |
# File 'lib/perfectqueue/backend/rdb.rb', line 36 def db @db end |
Instance Method Details
#cancel(id, delete_timeout = 3600, now = Process.clock_gettime(Process::CLOCK_REALTIME, :second)) ⇒ Object
51 52 53 54 55 56 |
# File 'lib/perfectqueue/backend/rdb.rb', line 51 def cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second)) connect { n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout-DELETE_OFFSET, id].update return n > 0 } end |
#submit(id, data, time = Process.clock_gettime(Process::CLOCK_REALTIME, :second), resource = nil, max_running = nil) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/perfectqueue/backend/rdb.rb', line 38 def submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second), resource=nil, max_running=nil) connect { begin data = Sequel::SQL::Blob.new(data) @db.sql_log_level = :debug n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at, resource, max_running) VALUES (?, ?, ?, ?, ?, ?);", id, time, data, time, resource, max_running].insert return true rescue Sequel::UniqueConstraintViolation => e return nil end } end |