Class: QC::DurableArray

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_classic/durable_array.rb

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ DurableArray

Returns a new instance of DurableArray.



33
34
35
36
37
38
# File 'lib/queue_classic/durable_array.rb', line 33

def initialize(args={})
  @db_string  = args[:database]
  @connection = connection
  execute("SET client_min_messages TO 'warning'")
  with_log("setup PG LISTEN") { execute("LISTEN jobs") }
end

Instance Method Details

#<<(details) ⇒ Object



40
41
42
43
# File 'lib/queue_classic/durable_array.rb', line 40

def <<(details)
  execute("INSERT INTO jobs (details) VALUES ('#{details.to_json}')")
  execute("NOTIFY jobs, 'new-job'")
end

#b_headObject



77
78
79
80
81
82
83
84
# File 'lib/queue_classic/durable_array.rb', line 77

def b_head
  if job = lock_head
    job
  else
    @connection.wait_for_notify {|e,p,msg| job = lock_head if msg == "new-job" }
    job
  end
end

#connectionObject



109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/queue_classic/durable_array.rb', line 109

def connection
  db_params = URI.parse(@db_string)
  if db_params.scheme == "postgres"
    PGconn.connect(
      :dbname   => db_params.path.gsub("/",""),
      :user     => db_params.user,
      :password => db_params.password,
      :host     => db_params.host
    )
  else
    PGconn.connect(:dbname => @db_string)
  end
end

#countObject



45
46
47
# File 'lib/queue_classic/durable_array.rb', line 45

def count
  execute("SELECT COUNT(*) from jobs")[0]["count"].to_i
end

#delete(job) ⇒ Object



49
50
51
52
# File 'lib/queue_classic/durable_array.rb', line 49

def delete(job)
  with_log("deleting job #{job.id}") { execute("DELETE FROM jobs WHERE id = #{job.id}") }
  job
end

#eachObject



86
87
88
89
90
# File 'lib/queue_classic/durable_array.rb', line 86

def each
  execute("SELECT * FROM jobs ORDER BY id ASC").each do |r|
    yield(JSON.parse(r["details"]))
  end
end

#execute(sql) ⇒ Object



92
93
94
# File 'lib/queue_classic/durable_array.rb', line 92

def execute(sql)
  @connection.async_exec(sql)
end

#find(job) ⇒ Object



54
55
56
# File 'lib/queue_classic/durable_array.rb', line 54

def find(job)
  find_one {"SELECT * FROM jobs WHERE id = #{job.id}"}
end

#find_oneObject



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/queue_classic/durable_array.rb', line 96

def find_one
  res = execute(yield)
  if res.count > 0
    res.map do |r|
      Job.new(
        "id"        => r["id"],
        "details"   => JSON.parse( r["details"]),
        "locked_at" => r["locked_at"]
      )
    end.pop
  end
end

#headObject Also known as: first



58
59
60
# File 'lib/queue_classic/durable_array.rb', line 58

def head
  find_one {"SELECT * FROM jobs ORDER BY id ASC LIMIT 1"}
end

#lock_headObject



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/queue_classic/durable_array.rb', line 63

def lock_head
  job = nil
  with_log("start lock transaction") do
    @connection.transaction do
      if job = find_one {"SELECT * FROM jobs WHERE locked_at IS NULL ORDER BY id ASC LIMIT 1 FOR UPDATE"}
        with_log("lock acquired for #{job.inspect}") do
          execute("UPDATE jobs SET locked_at = (CURRENT_TIMESTAMP) WHERE id = #{job.id} AND locked_at IS NULL")
        end
      end
    end
  end
  job
end

#log(msg) ⇒ Object



133
134
135
# File 'lib/queue_classic/durable_array.rb', line 133

def log(msg)
  puts "| \t" + msg
end

#with_log(msg) ⇒ Object



123
124
125
126
127
128
129
130
131
# File 'lib/queue_classic/durable_array.rb', line 123

def with_log(msg)
  res = yield
  if QC.logging_enabled?
    log(msg)
    log(res.cmd_status)           if res.respond_to?(:cmd_status)
    log(res.result_error_message) if res.respond_to?(:result_error_message)
  end
  res
end