Class: EQ::Queueing::Backends::LevelDB

Inherits:
Object
  • Object
show all
Defined in:
lib/eq-queueing/backends/leveldb.rb

Overview

Note:

This is a unoreded storage, so there is no guaranteed work order

Note:

assume there is nothing else than jobs

Defined Under Namespace

Classes: JobsCollection

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ LevelDB

Returns a new instance of LevelDB.



174
175
176
177
# File 'lib/eq-queueing/backends/leveldb.rb', line 174

def initialize config
  @db = ::LevelDB::DB.new config
  @jobs = JobsCollection.new(db)
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



171
172
173
# File 'lib/eq-queueing/backends/leveldb.rb', line 171

def db
  @db
end

#jobsObject (readonly)

Returns the value of attribute jobs.



172
173
174
# File 'lib/eq-queueing/backends/leveldb.rb', line 172

def jobs
  @jobs
end

Instance Method Details

#clearObject



211
212
213
# File 'lib/eq-queueing/backends/leveldb.rb', line 211

def clear
  db.each{|k,v| db.delete k}
end

#count(name = nil) ⇒ Object



215
216
217
218
219
220
221
222
223
224
# File 'lib/eq-queueing/backends/leveldb.rb', line 215

def count name=nil
  case name
  when :waiting
    jobs.count_waiting
  when :working
    jobs.count_working
  else
    jobs.count
  end
end

#iteratorObject



226
227
228
229
230
# File 'lib/eq-queueing/backends/leveldb.rb', line 226

def iterator
  jobs.each do |job|
    yield job
  end
end

#pop(job_id) ⇒ Object



195
196
197
# File 'lib/eq-queueing/backends/leveldb.rb', line 195

def pop job_id
  jobs.delete job_id
end

#push(job) ⇒ Object

Parameters:



180
181
182
183
184
185
186
# File 'lib/eq-queueing/backends/leveldb.rb', line 180

def push job
  if job.unique? && jobs.exists?(job)
    false
  else
    jobs.push job
  end
end

#requeue_timed_out_jobsObject



199
200
201
202
203
204
205
206
207
208
209
# File 'lib/eq-queueing/backends/leveldb.rb', line 199

def requeue_timed_out_jobs
  requeued = 0
  jobs.working_iterator do |job_id, started_working_at|
    # older than x
    if started_working_at <= (Time.now - EQ.config.job_timeout)
      jobs.stop_working job_id
      requeued += 1
    end
  end
  requeued
end

#reserveObject



188
189
190
191
192
193
# File 'lib/eq-queueing/backends/leveldb.rb', line 188

def reserve
  if job_id = jobs.first_waiting
    jobs.start_working job_id
    EQ::Job.new job_id, jobs.find_queue(job_id), jobs.find_payload(job_id)
  end
end