Class: PerfectQueue::Backend::RDBCompatBackend

Inherits:
Object
  • Object
show all
Includes:
PerfectQueue::BackendHelper
Defined in:
lib/perfectqueue/backend/rdb_compat.rb

Defined Under Namespace

Classes: Token

Constant Summary collapse

MAX_SELECT_ROW =
8
MAX_RESOURCE =
(ENV['PQ_MAX_RESOURCE'] || 4).to_i
MAX_RETRY =

KEEPALIVE = 10

10

Instance Attribute Summary collapse

Attributes included from PerfectQueue::BackendHelper

#client

Instance Method Summary collapse

Methods included from PerfectQueue::BackendHelper

#close

Constructor Details

#initialize(client, config) ⇒ RDBCompatBackend

Returns a new instance of RDBCompatBackend.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 27

def initialize(client, config)
  super

  require 'sequel'
  url = config[:url]
  @table = config[:table]
  unless @table
    raise ConfigError, ":table option is required"
  end

  #password = config[:password]
  #user = config[:user]
  @db = Sequel.connect(url, :max_connections=>1)
  @mutex = Mutex.new

  connect {
    # connection test
  }

  @sql = <<SQL
SELECT id, timeout, data, created_at, resource
FROM `#{@table}`
LEFT JOIN (
  SELECT resource AS res, COUNT(1) AS running
  FROM `#{@table}` AS T
  WHERE timeout > ? AND created_at IS NOT NULL AND resource IS NOT NULL
  GROUP BY resource
) AS R ON resource = res
WHERE timeout <= ? AND (running IS NULL OR running < #{MAX_RESOURCE})
ORDER BY timeout ASC LIMIT #{MAX_SELECT_ROW}
SQL

  # sqlite doesn't support SELECT ... FOR UPDATE but
  # sqlite doesn't need it because the db is not shared
  unless url.split('//',2)[0].to_s.include?('sqlite')
    @sql << 'FOR UPDATE'
  end
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



66
67
68
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 66

def db
  @db
end

Instance Method Details

#acquire(alive_time, max_acquire, options) ⇒ Object

> [AcquiredTask]



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 141

def acquire(alive_time, max_acquire, options)
  now = (options[:now] || Time.now).to_i
  next_timeout = now + alive_time

  connect {
    while true
      rows = 0
      @db.transaction do
        @db.fetch(@sql, now, now) {|row|
          unless row[:created_at]
            # finished task
            @db["DELETE FROM `#{@table}` WHERE id=?;", row[:id]].delete

          else
            ## optimistic lock is not needed because the row is locked for update
            #n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND timeout=?", timeout, row[:id], row[:timeout]].update
            n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=?", next_timeout, row[:id]].update
            if n > 0
              attributes = create_attributes(nil, row)
              task_token = Token.new(row[:id])
              task = AcquiredTask.new(@client, row[:id], attributes, task_token)
              return [task]
            end
          end

          rows += 1
        }
      end
      break nil if rows < MAX_SELECT_ROW
    end
  }
end

#cancel_request(key, options) ⇒ Object

> nil



175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 175

def cancel_request(key, options)
  now = (options[:now] || Time.now).to_i

  # created_at=-1 means cancel_requested
  connect {
    n = @db["UPDATE `#{@table}` SET created_at=-1 WHERE id=? AND created_at IS NOT NULL;", key].update
    if n <= 0
      raise AlreadyFinishedError, "task key=#{key} does not exist or already finished."
    end
  }
  nil
end

#finish(task_token, retention_time, options) ⇒ Object

> nil



193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 193

def finish(task_token, retention_time, options)
  now = (options[:now] || Time.now).to_i
  delete_timeout = now + retention_time
  key = task_token.key

  connect {
    n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", delete_timeout, key].update
    if n <= 0
      raise IdempotentAlreadyFinishedError, "task key=#{key} does not exist or already finished."
    end
  }
  nil
end

#force_finish(key, retention_time, options) ⇒ Object



188
189
190
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 188

def force_finish(key, retention_time, options)
  finish(Token.new(key), retention_time, options)
end

#get_task_metadata(key, options) ⇒ Object

> TaskStatus



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 89

def (key, options)
  now = (options[:now] || Time.now).to_i

  connect {
    row = @db.fetch("SELECT timeout, data, created_at, resource FROM `#{@table}` WHERE id=? LIMIT 1", key).first
    unless row
      raise NotFoundError, "task key=#{key} does no exist"
    end
    attributes = create_attributes(now, row)
    return TaskMetadata.new(@client, key, attributes)
  }
end

#heartbeat(task_token, alive_time, options) ⇒ Object

> nil



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 208

def heartbeat(task_token, alive_time, options)
  now = (options[:now] || Time.now).to_i
  next_timeout = now + alive_time
  key = task_token.key

  connect {
    n = @db["UPDATE `#{@table}` SET timeout=? WHERE id=? AND created_at IS NOT NULL;", next_timeout, key].update
    if n <= 0
      row = @db.fetch("SELECT id, created_at FROM `#{@table}` WHERE id=? LIMIT 1", key).first
      if row == nil
        raise AlreadyFinishedError, "task key=#{key} already finished."
      elsif row[:created_at] == -1
        raise CancelRequestedError, "task key=#{key} is cancel requested."
      else
        raise AlreadyFinishedError, "task key=#{key} already finished."
      end
    end
  }
  nil
end

#init_database(options) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 73

def init_database(options)
  sql = %[
      CREATE TABLE IF NOT EXISTS `#{@table}` (
        id VARCHAR(256) NOT NULL,
        timeout INT NOT NULL,
        data BLOB NOT NULL,
        created_at INT,
        resource VARCHAR(256),
        PRIMARY KEY (id)
      );]
  connect {
    @db.run sql
  }
end

#list(options, &block) ⇒ Object

yield [TaskWithMetadata]



108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 108

def list(options, &block)
  now = (options[:now] || Time.now).to_i

  connect {
    #@db.fetch("SELECT id, timeout, data, created_at, resource FROM `#{@table}` WHERE !(created_at IS NULL AND timeout <= ?) ORDER BY timeout ASC;", now) {|row|
    @db.fetch("SELECT id, timeout, data, created_at, resource FROM `#{@table}` ORDER BY timeout ASC;", now) {|row|
      attributes = create_attributes(now, row)
      task = TaskWithMetadata.new(@client, row[:id], attributes)
      yield task
    }
  }
end

#preempt(key, alive_time, options) ⇒ Object

> AcquiredTask

Raises:



103
104
105
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 103

def preempt(key, alive_time, options)
  raise NotSupportedError.new("preempt is not supported by rdb_compat backend")
end

#submit(key, type, data, options) ⇒ Object

> Task



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/perfectqueue/backend/rdb_compat.rb', line 122

def submit(key, type, data, options)
  now = (options[:now] || Time.now).to_i
  run_at = (options[:run_at] || now).to_i
  user = options[:user]
  priority = options[:priority]  # not supported
  data = data ? data.dup : {}
  data['type'] = type

  connect {
    begin
      n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at, resource) VALUES (?, ?, ?, ?, ?);", key, run_at, data.to_json, now, user].insert
      return Task.new(@client, key)
    rescue Sequel::DatabaseError
      raise IdempotentAlreadyExistsError, "task key=#{key} already exists"
    end
  }
end