Class: EQ::Queueing::Backends::Sequel
- Inherits:
-
Object
- Object
- EQ::Queueing::Backends::Sequel
- Includes:
- Logging
- Defined in:
- lib/eq-queueing/backends/sequel.rb
Overview
this class provides a queueing backend via Sequel ORM mapper basically any database adapter known by Sequel is supported configure via EQ::conig
Constant Summary collapse
- TABLE_NAME =
:jobs
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Instance Method Summary collapse
-
#initialize(config) ⇒ Sequel
constructor
establishes the connection to the database and ensures that the jobs table is created.
-
#jobs ⇒ Object
list of all jobs.
-
#pop(id) ⇒ TrueClass, FalseClass
finishes a job in the working queue.
-
#push(payload) ⇒ Fixnum
Id of the job.
-
#requeue_timed_out_jobs ⇒ Fixnum
this re-enqueues jobs that timed out.
-
#reserve ⇒ Array<Fixnum, String>
pulls a job from the waiting stack and moves it to the working stack.
-
#update_job!(changed_job) ⇒ Object
updates a changed job object, uses the :id key to identify the job.
-
#waiting ⇒ Object
list of jobs waiting to be worked on.
-
#working ⇒ Object
list of jobs currentyl being worked on.
Methods included from Logging
Constructor Details
#initialize(config) ⇒ Sequel
establishes the connection to the database and ensures that the jobs table is created
17 18 19 20 |
# File 'lib/eq-queueing/backends/sequel.rb', line 17 def initialize config connect config create_table_if_not_exists! end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
13 14 15 |
# File 'lib/eq-queueing/backends/sequel.rb', line 13 def db @db end |
Instance Method Details
#jobs ⇒ Object
list of all jobs
71 72 73 74 75 |
# File 'lib/eq-queueing/backends/sequel.rb', line 71 def jobs db[TABLE_NAME] rescue ::Sequel::DatabaseError => e retry if on_error e end |
#pop(id) ⇒ TrueClass, FalseClass
finishes a job in the working queue
50 51 52 53 54 |
# File 'lib/eq-queueing/backends/sequel.rb', line 50 def pop id jobs.where(id: id).delete == 1 rescue ::Sequel::DatabaseError => e retry if on_error e end |
#push(payload) ⇒ Fixnum
Returns id of the job.
24 25 26 27 28 |
# File 'lib/eq-queueing/backends/sequel.rb', line 24 def push payload jobs.insert payload: payload.to_sequel_blob, created_at: Time.now rescue ::Sequel::DatabaseError => e retry if on_error e end |
#requeue_timed_out_jobs ⇒ Fixnum
this re-enqueues jobs that timed out
87 88 89 90 91 |
# File 'lib/eq-queueing/backends/sequel.rb', line 87 def requeue_timed_out_jobs # 10 seconds ago jobs.where{started_working_at <= (Time.now - EQ.config.job_timeout)}\ .update(started_working_at: nil) end |
#reserve ⇒ Array<Fixnum, String>
pulls a job from the waiting stack and moves it to the working stack. sets a timestamp :started_working_at so that the working duration can be tracked.
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/eq-queueing/backends/sequel.rb', line 35 def reserve db.transaction do if job = waiting.order(:id.asc).limit(1).first job[:started_working_at] = Time.now update_job!(job) [job[:id], job[:payload]] end end rescue ::Sequel::DatabaseError => e retry if on_error e end |
#update_job!(changed_job) ⇒ Object
updates a changed job object, uses the :id key to identify the job
79 80 81 82 83 |
# File 'lib/eq-queueing/backends/sequel.rb', line 79 def update_job! changed_job jobs.where(id: changed_job[:id]).update(changed_job) rescue ::Sequel::DatabaseError => e retry if on_error e end |
#waiting ⇒ Object
list of jobs waiting to be worked on
57 58 59 60 61 |
# File 'lib/eq-queueing/backends/sequel.rb', line 57 def waiting jobs.where(started_working_at: nil) rescue ::Sequel::DatabaseError => e retry if on_error e end |
#working ⇒ Object
list of jobs currentyl being worked on
64 65 66 67 68 |
# File 'lib/eq-queueing/backends/sequel.rb', line 64 def working waiting.invert rescue ::Sequel::DatabaseError => e retry if on_error e end |