Class: Que::Poller

Inherits:
Object
  • Object
show all
Defined in:
lib/que/poller.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection:, queue:, poll_interval:) ⇒ Poller

Returns a new instance of Poller.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/que/poller.rb', line 122

def initialize(
  connection:,
  queue:,
  poll_interval:
)
  @connection          = connection
  @queue               = queue
  @poll_interval       = poll_interval
  @last_polled_at      = nil
  @last_poll_satisfied = nil

  Que.internal_log :poller_instantiate, self do
    {
      backend_pid:   connection.backend_pid,
      queue:         queue,
      poll_interval: poll_interval,
    }
  end
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



115
116
117
# File 'lib/que/poller.rb', line 115

def connection
  @connection
end

#last_poll_satisfiedObject (readonly)

Returns the value of attribute last_poll_satisfied.



115
116
117
# File 'lib/que/poller.rb', line 115

def last_poll_satisfied
  @last_poll_satisfied
end

#last_polled_atObject (readonly)

Returns the value of attribute last_polled_at.



115
116
117
# File 'lib/que/poller.rb', line 115

def last_polled_at
  @last_polled_at
end

#poll_intervalObject (readonly)

Returns the value of attribute poll_interval.



115
116
117
# File 'lib/que/poller.rb', line 115

def poll_interval
  @poll_interval
end

#queueObject (readonly)

Returns the value of attribute queue.



115
116
117
# File 'lib/que/poller.rb', line 115

def queue
  @queue
end

Class Method Details

.cleanup(connection) ⇒ Object



258
259
260
261
262
263
264
# File 'lib/que/poller.rb', line 258

def cleanup(connection)
  connection.execute <<-SQL
    DROP FUNCTION pg_temp.que_highest_remaining_priority(jsonb);
    DROP FUNCTION pg_temp.lock_and_update_priorities(jsonb, que_jobs);
    DROP TYPE pg_temp.que_query_result;
  SQL
end

.setup(connection) ⇒ Object

Manage some temporary infrastructure (specific to the connection) that we’ll use for polling. These could easily be created permanently in a migration, but that’d require another migration if we wanted to tweak them later.



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/que/poller.rb', line 194

def setup(connection)
  connection.execute <<-SQL
    -- Temporary composite type we need for our queries to work.
    CREATE TYPE pg_temp.que_query_result AS (
      locked boolean,
      remaining_priorities jsonb
    );

    CREATE FUNCTION pg_temp.lock_and_update_priorities(priorities jsonb, job que_jobs)
    RETURNS pg_temp.que_query_result
    AS $$
      WITH
        -- Take the lock in a CTE because we want to use the result
        -- multiple times while only taking the lock once.
        lock_taken AS (
          SELECT pg_try_advisory_lock((job).id) AS taken
        ),
        relevant AS (
          SELECT priority, count
          FROM (
            SELECT
              key::smallint AS priority,
              value::text::integer AS count
            FROM jsonb_each(priorities)
            ) t1
          WHERE priority >= (job).priority
          ORDER BY priority ASC
          LIMIT 1
        )
      SELECT
        (SELECT taken FROM lock_taken), -- R
        CASE (SELECT taken FROM lock_taken)
        WHEN false THEN
          -- Simple case - we couldn't lock the job, so don't update the
          -- priorities hash.
          priorities
        WHEN true THEN
          CASE count
          WHEN 1 THEN
            -- Remove the priority from the JSONB doc entirely, rather
            -- than leaving a zero entry in it.
            priorities - priority::text
          ELSE
            -- Decrement the value in the JSONB doc.
            jsonb_set(
              priorities,
              ARRAY[priority::text],
              to_jsonb(count - 1)
            )
          END
        END
      FROM relevant
    $$
    STABLE
    LANGUAGE SQL;

    CREATE FUNCTION pg_temp.que_highest_remaining_priority(priorities jsonb) RETURNS smallint AS $$
      SELECT max(key::smallint) FROM jsonb_each(priorities)
    $$
    STABLE
    LANGUAGE SQL;
  SQL
end

Instance Method Details

#poll(priorities:, held_locks:) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/que/poller.rb', line 142

def poll(
  priorities:,
  held_locks:
)

  return unless should_poll?

  jobs =
    connection.execute_prepared(
      :poll_jobs,
      [
        @queue,
        "{#{held_locks.to_a.join(',')}}",
        JSON.dump(priorities),
      ]
    )

  @last_polled_at      = Time.now
  @last_poll_satisfied = poll_satisfied?(priorities, jobs)

  Que.internal_log :poller_polled, self do
    {
      queue:        @queue,
      locked:       jobs.count,
      priorities:   priorities,
      held_locks:   held_locks.to_a,
      newly_locked: jobs.map { |key| key.fetch(:id) },
    }
  end

  jobs.map! { |job| Metajob.new(job) }
end

#poll_interval_elapsed?Boolean

Returns:

  • (Boolean)


183
184
185
186
# File 'lib/que/poller.rb', line 183

def poll_interval_elapsed?
  return unless interval = poll_interval
  (Time.now - last_polled_at) > interval
end

#should_poll?Boolean

Returns:

  • (Boolean)


175
176
177
178
179
180
181
# File 'lib/que/poller.rb', line 175

def should_poll?
  # Never polled before?
  last_poll_satisfied.nil? ||
  # Plenty of jobs were available last time?
  last_poll_satisfied == true ||
  poll_interval_elapsed?
end