Class: QC::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_classic/queue.rb

Overview

The queue class maps a queue abstraction onto a database table.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, top_bound = nil) ⇒ Queue

Returns a new instance of Queue.



12
13
14
15
# File 'lib/queue_classic/queue.rb', line 12

def initialize(name, top_bound=nil)
  @name = name
  @top_bound = top_bound || QC.top_bound
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



10
11
12
# File 'lib/queue_classic/queue.rb', line 10

def name
  @name
end

#top_boundObject (readonly)

Returns the value of attribute top_bound.



10
11
12
# File 'lib/queue_classic/queue.rb', line 10

def top_bound
  @top_bound
end

Instance Method Details

#conn_adapterObject



21
22
23
# File 'lib/queue_classic/queue.rb', line 21

def conn_adapter
  @adapter ||= QC.default_conn_adapter
end

#conn_adapter=(a) ⇒ Object



17
18
19
# File 'lib/queue_classic/queue.rb', line 17

def conn_adapter=(a)
  @adapter = a
end

#countObject

Count the number of jobs in a specific queue. This returns all jobs, including ones that are scheduled in the future.



151
152
153
# File 'lib/queue_classic/queue.rb', line 151

def count
  _count('queue.count', "SELECT COUNT(*) FROM #{QC.table_name} WHERE q_name = $1")
end

#count_readyObject

Count the number of jobs in a specific queue, except ones scheduled in the future



156
157
158
# File 'lib/queue_classic/queue.rb', line 156

def count_ready
  _count('queue.count_scheduled', "SELECT COUNT(*) FROM #{QC.table_name} WHERE q_name = $1 AND scheduled_at <= now()")
end

#count_scheduledObject

Count the number of jobs in a specific queue scheduled in the future



161
162
163
# File 'lib/queue_classic/queue.rb', line 161

def count_scheduled
  _count('queue.count_scheduled', "SELECT COUNT(*) FROM #{QC.table_name} WHERE q_name = $1 AND scheduled_at > now()")
end

#delete(id) ⇒ Object



136
137
138
139
140
# File 'lib/queue_classic/queue.rb', line 136

def delete(id)
  QC.log_yield(:measure => 'queue.delete') do
    conn_adapter.execute("DELETE FROM #{QC.table_name} WHERE id = $1", id)
  end
end

#delete_allObject



142
143
144
145
146
147
# File 'lib/queue_classic/queue.rb', line 142

def delete_all
  QC.log_yield(:measure => 'queue.delete_all') do
    s = "DELETE FROM #{QC.table_name} WHERE q_name = $1"
    conn_adapter.execute(s, name)
  end
end

#enqueue(method, *args) ⇒ Object

enqueue(m,a) inserts a row into the jobs table and trigger a notification. The job’s queue is represented by a name column in the row. There is a trigger on the table which will send a NOTIFY event on a channel which corresponds to the name of the queue. The method argument is a string encoded ruby expression. The expression will be separated by a ‘.` character and then `eval`d. Examples of the method argument include: `puts`, `Kernel.puts`, `MyObject.new.puts`. The args argument will be encoded as JSON and stored as a JSON datatype in the row. (If the version of PG does not support JSON, then the args will be stored as text. The args are stored as a collection and then splatted inside the worker. Examples of args include: `’hello world’‘, `[’hello world’]‘, `’hello’, ‘world’‘. This method returns a hash with the id of the enqueued job.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/queue_classic/queue.rb', line 40

def enqueue(method, *args)
  QC.log_yield(:measure => 'queue.enqueue') do
    s = "INSERT INTO #{QC.table_name} (q_name, method, args) VALUES ($1, $2, $3) RETURNING id"
    begin
      retries ||= 0
      conn_adapter.execute(s, name, method, JSON.dump(args))
    rescue PG::Error
      if (retries += 1) < 2
        retry
      else
        raise
      end
    end
  end
end

#enqueue_at(timestamp, method, *args) ⇒ Object

enqueue_at(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time. The time argument must be a Time object or a float timestamp. The method and args argument must be in the form described in the documentation for the #enqueue method. This method returns a hash with the id of the enqueued job.



62
63
64
65
# File 'lib/queue_classic/queue.rb', line 62

def enqueue_at(timestamp, method, *args)
  offset = Time.at(timestamp).to_i - Time.now.to_i
  enqueue_in(offset, method, *args)
end

#enqueue_in(seconds, method, *args) ⇒ Object

enqueue_in(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time offset. The seconds argument must be an integer. The method and args argument must be in the form described in the documentation for the #enqueue method. This method returns a hash with the id of the enqueued job.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/queue_classic/queue.rb', line 73

def enqueue_in(seconds, method, *args)
  QC.log_yield(:measure => 'queue.enqueue') do
    s = "INSERT INTO #{QC.table_name} (q_name, method, args, scheduled_at)
         VALUES ($1, $2, $3, now() + interval '#{seconds.to_i} seconds')
         RETURNING id"
    begin
      retries ||= 0
      conn_adapter.execute(s, name, method, JSON.dump(args))
    rescue PG::Error
      if (retries += 1) < 2
        retry
      else
        raise
      end
    end
  end
end

#lockObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/queue_classic/queue.rb', line 91

def lock
  QC.log_yield(:measure => 'queue.lock') do
    s = <<~SQL
      WITH selected_job AS (
        SELECT id
        FROM queue_classic_jobs
        WHERE
          locked_at IS NULL AND
          q_name = $1 AND
          scheduled_at <= now()
        LIMIT 1
        FOR NO KEY UPDATE SKIP LOCKED
      )
      UPDATE queue_classic_jobs
      SET
        locked_at = now(),
        locked_by = pg_backend_pid()
      FROM selected_job
      WHERE queue_classic_jobs.id = selected_job.id
      RETURNING *
    SQL

    if r = conn_adapter.execute(s, name)
      {}.tap do |job|
        job[:id] = r["id"]
        job[:q_name] = r["q_name"]
        job[:method] = r["method"]
        job[:args] = JSON.parse(r["args"])
        if r["scheduled_at"]
          job[:scheduled_at] = r["scheduled_at"].kind_of?(Time) ? r["scheduled_at"] : Time.parse(r["scheduled_at"])
          ttl = Integer((Time.now - job[:scheduled_at]) * 1000)
          QC.measure("time-to-lock=#{ttl}ms source=#{name}")
        end
      end
    end
  end
end

#unlock(id) ⇒ Object



129
130
131
132
133
134
# File 'lib/queue_classic/queue.rb', line 129

def unlock(id)
  QC.log_yield(:measure => 'queue.unlock') do
    s = "UPDATE #{QC.table_name} SET locked_at = NULL WHERE id = $1"
    conn_adapter.execute(s, id)
  end
end