Class: EQ::Queueing::Backends::Sequel

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/eq-queueing/backends/sequel.rb

Overview

this class provides a queueing backend via Sequel ORM mapper basically any database adapter known by Sequel is supported configure via EQ::conig

Constant Summary collapse

TABLE_NAME =
:jobs

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#debug, #info, #log_error

Constructor Details

#initialize(config) ⇒ Sequel

establishes the connection to the database and ensures that the jobs table is created



17
18
19
20
# File 'lib/eq-queueing/backends/sequel.rb', line 17

def initialize config
  connect config
  create_table_if_not_exists!
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



13
14
15
# File 'lib/eq-queueing/backends/sequel.rb', line 13

def db
  @db
end

Instance Method Details

#jobsObject

list of all jobs



71
72
73
74
75
# File 'lib/eq-queueing/backends/sequel.rb', line 71

def jobs
  db[TABLE_NAME]
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#pop(id) ⇒ TrueClass, FalseClass

finishes a job in the working queue

Parameters:

  • id (Fixnum)

    of the job

Returns:

  • (TrueClass, FalseClass)

    true, when there was a job that could be deleted



50
51
52
53
54
# File 'lib/eq-queueing/backends/sequel.rb', line 50

def pop id
  jobs.where(id: id).delete == 1
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#push(payload) ⇒ Fixnum

Returns id of the job.

Parameters:

  • payload (#to_sequel_block)

Returns:

  • (Fixnum)

    id of the job



24
25
26
27
28
# File 'lib/eq-queueing/backends/sequel.rb', line 24

def push payload
  jobs.insert payload: payload.to_sequel_blob, created_at: Time.now
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#requeue_timed_out_jobsFixnum

this re-enqueues jobs that timed out

Returns:

  • (Fixnum)

    number of jobs that were re-enqueued



87
88
89
90
91
# File 'lib/eq-queueing/backends/sequel.rb', line 87

def requeue_timed_out_jobs
  # 10 seconds ago
  jobs.where{started_working_at <= (Time.now - EQ.config.job_timeout)}\
      .update(started_working_at: nil)
end

#reserveArray<Fixnum, String>

pulls a job from the waiting stack and moves it to the working stack. sets a timestamp :started_working_at so that the working duration can be tracked.

Parameters:

  • now (Time)

Returns:

  • (Array<Fixnum, String>)

    job data consisting of id and payload



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/eq-queueing/backends/sequel.rb', line 35

def reserve
  db.transaction do
    if job = waiting.order(:id.asc).limit(1).first
      job[:started_working_at] = Time.now
      update_job!(job)
      [job[:id], job[:payload]]
    end
  end
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#update_job!(changed_job) ⇒ Object

updates a changed job object, uses the :id key to identify the job

Parameters:

  • changed (Hash)

    job



79
80
81
82
83
# File 'lib/eq-queueing/backends/sequel.rb', line 79

def update_job! changed_job
  jobs.where(id: changed_job[:id]).update(changed_job)
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#waitingObject

list of jobs waiting to be worked on



57
58
59
60
61
# File 'lib/eq-queueing/backends/sequel.rb', line 57

def waiting
  jobs.where(started_working_at: nil)
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end

#workingObject

list of jobs currentyl being worked on



64
65
66
67
68
# File 'lib/eq-queueing/backends/sequel.rb', line 64

def working
  waiting.invert
rescue ::Sequel::DatabaseError => e
  retry if on_error e
end