Class: QC::Queue
- Inherits:
-
Object
- Object
- QC::Queue
- Defined in:
- lib/queue_classic/queue.rb
Overview
The queue class maps a queue abstraction onto a database table.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#top_bound ⇒ Object
readonly
Returns the value of attribute top_bound.
Instance Method Summary collapse
- #conn_adapter ⇒ Object
- #conn_adapter=(a) ⇒ Object
- #count ⇒ Object
- #delete(id) ⇒ Object
- #delete_all ⇒ Object
-
#enqueue(method, *args) ⇒ Object
enqueue(m,a) inserts a row into the jobs table and trigger a notification.
-
#enqueue_at(timestamp, method, *args) ⇒ Object
enqueue_at(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time.
-
#enqueue_in(seconds, method, *args) ⇒ Object
enqueue_in(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time offset.
-
#initialize(name, top_bound = nil) ⇒ Queue
constructor
A new instance of Queue.
- #lock ⇒ Object
- #unlock(id) ⇒ Object
Constructor Details
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
9 10 11 |
# File 'lib/queue_classic/queue.rb', line 9 def name @name end |
#top_bound ⇒ Object (readonly)
Returns the value of attribute top_bound.
9 10 11 |
# File 'lib/queue_classic/queue.rb', line 9 def top_bound @top_bound end |
Instance Method Details
#conn_adapter ⇒ Object
19 20 21 |
# File 'lib/queue_classic/queue.rb', line 19 def conn_adapter @adapter ||= QC.default_conn_adapter end |
#conn_adapter=(a) ⇒ Object
15 16 17 |
# File 'lib/queue_classic/queue.rb', line 15 def conn_adapter=(a) @adapter = a end |
#count ⇒ Object
106 107 108 109 110 111 112 |
# File 'lib/queue_classic/queue.rb', line 106 def count QC.log_yield(:measure => 'queue.count') do s = "SELECT COUNT(*) FROM #{TABLE_NAME} WHERE q_name = $1" r = conn_adapter.execute(s, name) r["count"].to_i end end |
#delete(id) ⇒ Object
93 94 95 96 97 |
# File 'lib/queue_classic/queue.rb', line 93 def delete(id) QC.log_yield(:measure => 'queue.delete') do conn_adapter.execute("DELETE FROM #{TABLE_NAME} where id = $1", id) end end |
#delete_all ⇒ Object
99 100 101 102 103 104 |
# File 'lib/queue_classic/queue.rb', line 99 def delete_all QC.log_yield(:measure => 'queue.delete_all') do s = "DELETE FROM #{TABLE_NAME} WHERE q_name = $1" conn_adapter.execute(s, name) end end |
#enqueue(method, *args) ⇒ Object
enqueue(m,a) inserts a row into the jobs table and trigger a notification. The job’s queue is represented by a name column in the row. There is a trigger on the table which will send a NOTIFY event on a channel which corresponds to the name of the queue. The method argument is a string encoded ruby expression. The expression will be separated by a ‘.` character and then `eval`d. Examples of the method argument include: `puts`, `Kernel.puts`, `MyObject.new.puts`. The args argument will be encoded as JSON and stored as a JSON datatype in the row. (If the version of PG does not support JSON, then the args will be stored as text. The args are stored as a collection and then splatted inside the worker. Examples of args include: `’hello world’‘, `[’hello world’]‘, `’hello’, ‘world’‘.
37 38 39 40 41 42 |
# File 'lib/queue_classic/queue.rb', line 37 def enqueue(method, *args) QC.log_yield(:measure => 'queue.enqueue') do s = "INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)" conn_adapter.execute(s, name, method, JSON.dump(args)) end end |
#enqueue_at(timestamp, method, *args) ⇒ Object
enqueue_at(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time. The time argument must be a Time object or a float timestamp. The method and args argument must be in the form described in the documentation for the #enqueue method.
49 50 51 52 |
# File 'lib/queue_classic/queue.rb', line 49 def enqueue_at(, method, *args) offset = Time.at() - Time.now enqueue_in(offset, method, *args) end |
#enqueue_in(seconds, method, *args) ⇒ Object
enqueue_in(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time offset. The seconds argument must be an integer. The method and args argument must be in the form described in the documentation for the #enqueue method.
59 60 61 62 63 64 65 |
# File 'lib/queue_classic/queue.rb', line 59 def enqueue_in(seconds, method, *args) QC.log_yield(:measure => 'queue.enqueue') do s = "INSERT INTO #{TABLE_NAME} (q_name, method, args, scheduled_at) VALUES ($1, $2, $3, now() + interval '#{seconds.to_i} seconds')" conn_adapter.execute(s, name, method, JSON.dump(args)) end end |
#lock ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/queue_classic/queue.rb', line 67 def lock QC.log_yield(:measure => 'queue.lock') do s = "SELECT * FROM lock_head($1, $2)" if r = conn_adapter.execute(s, name, top_bound) {}.tap do |job| job[:id] = r["id"] job[:q_name] = r["q_name"] job[:method] = r["method"] job[:args] = JSON.parse(r["args"]) if r["scheduled_at"] job[:scheduled_at] = Time.parse(r["scheduled_at"]) ttl = Integer((Time.now - job[:scheduled_at]) * 1000) QC.measure("time-to-lock=#{ttl}ms source=#{name}") end end end end end |
#unlock(id) ⇒ Object
86 87 88 89 90 91 |
# File 'lib/queue_classic/queue.rb', line 86 def unlock(id) QC.log_yield(:measure => 'queue.unlock') do s = "UPDATE #{TABLE_NAME} set locked_at = null where id = $1" conn_adapter.execute(s, id) end end |