Class: QC::DurableArray
- Inherits:
-
Object
- Object
- QC::DurableArray
- Defined in:
- lib/queue_classic/durable_array.rb
Instance Method Summary collapse
- #<<(details) ⇒ Object
- #b_head ⇒ Object
- #connection ⇒ Object
- #count ⇒ Object
- #delete(job) ⇒ Object
- #each ⇒ Object
- #execute(sql) ⇒ Object
- #find(job) ⇒ Object
- #find_one ⇒ Object
- #head ⇒ Object (also: #first)
-
#initialize(args = {}) ⇒ DurableArray
constructor
A new instance of DurableArray.
- #lock_head ⇒ Object
- #log(msg) ⇒ Object
- #with_log(msg) ⇒ Object
Constructor Details
#initialize(args = {}) ⇒ DurableArray
Returns a new instance of DurableArray.
33 34 35 36 37 38 |
# File 'lib/queue_classic/durable_array.rb', line 33 def initialize(args={}) @db_string = args[:database] @connection = connection execute("SET client_min_messages TO 'warning'") with_log("setup PG LISTEN") { execute("LISTEN jobs") } end |
Instance Method Details
#<<(details) ⇒ Object
40 41 42 43 |
# File 'lib/queue_classic/durable_array.rb', line 40 def <<(details) execute("INSERT INTO jobs (details) VALUES ('#{details.to_json}')") execute("NOTIFY jobs, 'new-job'") end |
#b_head ⇒ Object
77 78 79 80 81 82 83 84 |
# File 'lib/queue_classic/durable_array.rb', line 77 def b_head if job = lock_head job else @connection.wait_for_notify {|e,p,msg| job = lock_head if msg == "new-job" } job end end |
#connection ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/queue_classic/durable_array.rb', line 109 def connection db_params = URI.parse(@db_string) if db_params.scheme == "postgres" PGconn.connect( :dbname => db_params.path.gsub("/",""), :user => db_params.user, :password => db_params.password, :host => db_params.host ) else PGconn.connect(:dbname => @db_string) end end |
#count ⇒ Object
45 46 47 |
# File 'lib/queue_classic/durable_array.rb', line 45 def count execute("SELECT COUNT(*) from jobs")[0]["count"].to_i end |
#delete(job) ⇒ Object
49 50 51 52 |
# File 'lib/queue_classic/durable_array.rb', line 49 def delete(job) with_log("deleting job #{job.id}") { execute("DELETE FROM jobs WHERE id = #{job.id}") } job end |
#each ⇒ Object
86 87 88 89 90 |
# File 'lib/queue_classic/durable_array.rb', line 86 def each execute("SELECT * FROM jobs ORDER BY id ASC").each do |r| yield(JSON.parse(r["details"])) end end |
#execute(sql) ⇒ Object
92 93 94 |
# File 'lib/queue_classic/durable_array.rb', line 92 def execute(sql) @connection.async_exec(sql) end |
#find(job) ⇒ Object
54 55 56 |
# File 'lib/queue_classic/durable_array.rb', line 54 def find(job) find_one {"SELECT * FROM jobs WHERE id = #{job.id}"} end |
#find_one ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/queue_classic/durable_array.rb', line 96 def find_one res = execute(yield) if res.count > 0 res.map do |r| Job.new( "id" => r["id"], "details" => JSON.parse( r["details"]), "locked_at" => r["locked_at"] ) end.pop end end |
#head ⇒ Object Also known as: first
58 59 60 |
# File 'lib/queue_classic/durable_array.rb', line 58 def head find_one {"SELECT * FROM jobs ORDER BY id ASC LIMIT 1"} end |
#lock_head ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/queue_classic/durable_array.rb', line 63 def lock_head job = nil with_log("start lock transaction") do @connection.transaction do if job = find_one {"SELECT * FROM jobs WHERE locked_at IS NULL ORDER BY id ASC LIMIT 1 FOR UPDATE"} with_log("lock acquired for #{job.inspect}") do execute("UPDATE jobs SET locked_at = (CURRENT_TIMESTAMP) WHERE id = #{job.id} AND locked_at IS NULL") end end end end job end |
#log(msg) ⇒ Object
133 134 135 |
# File 'lib/queue_classic/durable_array.rb', line 133 def log(msg) puts "| \t" + msg end |
#with_log(msg) ⇒ Object
123 124 125 126 127 128 129 130 131 |
# File 'lib/queue_classic/durable_array.rb', line 123 def with_log(msg) res = yield if QC.logging_enabled? log(msg) log(res.cmd_status) if res.respond_to?(:cmd_status) log(res.) if res.respond_to?(:result_error_message) end res end |