Class: QC::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_classic/queue.rb

Overview

The queue class maps a queue abstraction onto a database table.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, top_bound = nil) ⇒ Queue

Returns a new instance of Queue.



10
11
12
13
# File 'lib/queue_classic/queue.rb', line 10

def initialize(name, top_bound=nil)
  @name = name
  @top_bound = top_bound || QC::TOP_BOUND
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/queue_classic/queue.rb', line 9

def name
  @name
end

#top_boundObject (readonly)

Returns the value of attribute top_bound.



9
10
11
# File 'lib/queue_classic/queue.rb', line 9

def top_bound
  @top_bound
end

Instance Method Details

#conn_adapterObject



19
20
21
# File 'lib/queue_classic/queue.rb', line 19

def conn_adapter
  @adapter ||= QC.default_conn_adapter
end

#conn_adapter=(a) ⇒ Object



15
16
17
# File 'lib/queue_classic/queue.rb', line 15

def conn_adapter=(a)
  @adapter = a
end

#countObject



106
107
108
109
110
111
112
# File 'lib/queue_classic/queue.rb', line 106

def count
  QC.log_yield(:measure => 'queue.count') do
    s = "SELECT COUNT(*) FROM #{TABLE_NAME} WHERE q_name = $1"
    r = conn_adapter.execute(s, name)
    r["count"].to_i
  end
end

#delete(id) ⇒ Object



93
94
95
96
97
# File 'lib/queue_classic/queue.rb', line 93

def delete(id)
  QC.log_yield(:measure => 'queue.delete') do
    conn_adapter.execute("DELETE FROM #{TABLE_NAME} where id = $1", id)
  end
end

#delete_allObject



99
100
101
102
103
104
# File 'lib/queue_classic/queue.rb', line 99

def delete_all
  QC.log_yield(:measure => 'queue.delete_all') do
    s = "DELETE FROM #{TABLE_NAME} WHERE q_name = $1"
    conn_adapter.execute(s, name)
  end
end

#enqueue(method, *args) ⇒ Object

enqueue(m,a) inserts a row into the jobs table and trigger a notification. The job’s queue is represented by a name column in the row. There is a trigger on the table which will send a NOTIFY event on a channel which corresponds to the name of the queue. The method argument is a string encoded ruby expression. The expression will be separated by a ‘.` character and then `eval`d. Examples of the method argument include: `puts`, `Kernel.puts`, `MyObject.new.puts`. The args argument will be encoded as JSON and stored as a JSON datatype in the row. (If the version of PG does not support JSON, then the args will be stored as text. The args are stored as a collection and then splatted inside the worker. Examples of args include: `’hello world’‘, `[’hello world’]‘, `’hello’, ‘world’‘.



37
38
39
40
41
42
# File 'lib/queue_classic/queue.rb', line 37

def enqueue(method, *args)
  QC.log_yield(:measure => 'queue.enqueue') do
    s = "INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)"
    conn_adapter.execute(s, name, method, JSON.dump(args))
  end
end

#enqueue_at(timestamp, method, *args) ⇒ Object

enqueue_at(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time. The time argument must be a Time object or a float timestamp. The method and args argument must be in the form described in the documentation for the #enqueue method.



49
50
51
52
# File 'lib/queue_classic/queue.rb', line 49

def enqueue_at(timestamp, method, *args)
  offset = Time.at(timestamp) - Time.now
  enqueue_in(offset, method, *args)
end

#enqueue_in(seconds, method, *args) ⇒ Object

enqueue_in(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time offset. The seconds argument must be an integer. The method and args argument must be in the form described in the documentation for the #enqueue method.



59
60
61
62
63
64
65
# File 'lib/queue_classic/queue.rb', line 59

def enqueue_in(seconds, method, *args)
  QC.log_yield(:measure => 'queue.enqueue') do
    s = "INSERT INTO #{TABLE_NAME} (q_name, method, args, scheduled_at)
         VALUES ($1, $2, $3, now() + interval '#{seconds.to_i} seconds')"
    conn_adapter.execute(s, name, method, JSON.dump(args))
  end
end

#lockObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/queue_classic/queue.rb', line 67

def lock
  QC.log_yield(:measure => 'queue.lock') do
    s = "SELECT * FROM lock_head($1, $2)"
    if r = conn_adapter.execute(s, name, top_bound)
      {}.tap do |job|
        job[:id] = r["id"]
        job[:q_name] = r["q_name"]
        job[:method] = r["method"]
        job[:args] = JSON.parse(r["args"])
        if r["scheduled_at"]
          job[:scheduled_at] = Time.parse(r["scheduled_at"])
          ttl = Integer((Time.now - job[:scheduled_at]) * 1000)
          QC.measure("time-to-lock=#{ttl}ms source=#{name}")
        end
      end
    end
  end
end

#unlock(id) ⇒ Object



86
87
88
89
90
91
# File 'lib/queue_classic/queue.rb', line 86

def unlock(id)
  QC.log_yield(:measure => 'queue.unlock') do
    s = "UPDATE #{TABLE_NAME} set locked_at = null where id = $1"
    conn_adapter.execute(s, id)
  end
end