Class: PerfectQueue::Backend::RDBBackend

Inherits:
Object
  • Object
show all
Defined in:
lib/perfectqueue/backend/rdb.rb

Defined Under Namespace

Classes: Token

Constant Summary collapse

MAX_RETRY =
::PerfectQueue::Backend::RDBCompatBackend::MAX_RETRY
DELETE_OFFSET =
::PerfectQueue::Backend::RDBCompatBackend::DELETE_OFFSET

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, table, config = {}) ⇒ RDBBackend

Returns a new instance of RDBBackend.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/perfectqueue/backend/rdb.rb', line 12

def initialize(uri, table, config={})
  @uri = uri
  @table = table

  u = URI.parse(@uri)
  options = {
    max_connections: 1,
    user: u.user,
    password: u.password,
    host: u.host,
    port: u.port ? u.port.to_i : 3306
  }
  @pq_connect_timeout = config.fetch(:pq_connect_timeout, 20)
  options[:connect_timeout] = config.fetch(:connect_timeout, 3)
  options[:sslca] = config[:sslca] if config[:sslca]
  db_name = u.path.split('/')[1]
  @db = Sequel.mysql2(db_name, options)

  @mutex = Mutex.new
  connect {
    # connection test
  }
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



36
37
38
# File 'lib/perfectqueue/backend/rdb.rb', line 36

def db
  @db
end

Instance Method Details

#cancel(id, delete_timeout = 3600, now = Process.clock_gettime(Process::CLOCK_REALTIME, :second)) ⇒ Object



51
52
53
54
55
56
# File 'lib/perfectqueue/backend/rdb.rb', line 51

def cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second))
  connect {
    n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout-DELETE_OFFSET, id].update
    return n > 0
  }
end

#submit(id, data, time = Process.clock_gettime(Process::CLOCK_REALTIME, :second), resource = nil, max_running = nil) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/perfectqueue/backend/rdb.rb', line 38

def submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second), resource=nil, max_running=nil)
  connect {
    begin
      data = Sequel::SQL::Blob.new(data)
      @db.sql_log_level = :debug
      n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at, resource, max_running) VALUES (?, ?, ?, ?, ?, ?);", id, time, data, time, resource, max_running].insert
      return true
    rescue Sequel::UniqueConstraintViolation => e
      return nil
    end
  }
end