Class: Postqueue::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/postqueue/queue/runner.rb,
lib/postqueue/queue.rb,
lib/postqueue/queue/logging.rb,
lib/postqueue/queue/callback.rb,
lib/postqueue/queue/processing.rb,
lib/postqueue/queue/select_and_lock.rb

Overview

The Postqueue processor processes items in a single Postqueue table.

Defined Under Namespace

Classes: Timing

Constant Summary collapse

VALID_PROCESSING_VALUES =
[ :async, :sync, :verify ]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize {|_self| ... } ⇒ Queue

Returns a new instance of Queue.

Yields:

  • (_self)

Yield Parameters:



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/postqueue/queue.rb', line 27

def initialize(&block)
  @batch_sizes = {}
  @item_class = ::Postqueue::Item
  @default_batch_size = 1
  @max_attemps = 5
  @idempotent_operations = {}
  @batch_sizes = {}
  @processing = :async

  on "test" do |_op, entity_ids|
    Postqueue.logger.info "[test] processing entity_ids: #{entity_ids.inspect}"
  end

  on "fail" do |_op, entity_ids|
    raise "Postqueue test failure, w/entity_ids: #{entity_ids.inspect}"
  end

  on :missing_handler do |op, entity_ids|
    raise MissingHandler, queue: self, op: op, entity_ids: entity_ids
  end

  on_exception do |e, _, _|
    e.send :raise
  end

  yield self if block
end

Instance Attribute Details

#default_batch_sizeObject

The default batch size. Will be used if no specific batch size is defined for an operation.



9
10
11
# File 'lib/postqueue/queue.rb', line 9

def default_batch_size
  @default_batch_size
end

#item_classObject

The AR::Base class to use. You would only change this if you want to run the queue in a different database or in a different table.



5
6
7
# File 'lib/postqueue/queue.rb', line 5

def item_class
  @item_class
end

#max_attempsObject (readonly)

maximum number of processing attempts.



12
13
14
# File 'lib/postqueue/queue.rb', line 12

def max_attemps
  @max_attemps
end

Instance Method Details

#assert_valid_op!(op) ⇒ Object

Raises:



19
20
21
22
23
24
# File 'lib/postqueue/queue/callback.rb', line 19

def assert_valid_op!(op)
  return if op == :missing_handler
  return if op.is_a?(String)

  raise ArgumentError, "Invalid op #{op.inspect}, must be a string"
end

#batch_size(op:) ⇒ Object



55
56
57
# File 'lib/postqueue/queue.rb', line 55

def batch_size(op:)
  @batch_sizes[op] || default_batch_size || 1
end

#calculate_batch_size(op:, max_batch_size:) ⇒ Object



60
61
62
63
64
65
# File 'lib/postqueue/queue/select_and_lock.rb', line 60

def calculate_batch_size(op:, max_batch_size:)
  recommended_batch_size = batch_size(op: op)
  return 1 if recommended_batch_size < 2
  return recommended_batch_size unless max_batch_size
  max_batch_size < recommended_batch_size ? max_batch_size : recommended_batch_size
end

#enqueue(op:, entity_id:) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/postqueue/queue.rb', line 63

def enqueue(op:, entity_id:)
  enqueued_items = item_class.enqueue op: op, entity_id: entity_id, ignore_duplicates: idempotent_operation?(op)
  return enqueued_items unless enqueued_items > 0

  case processing
  when :async
    :nop
  when :sync
    process_until_empty(op: op)
  when :verify
    raise(MissingHandler, queue: self, op: op, entity_ids: [entity_id]) unless callback_for(op: op)
  end

  enqueued_items
end

#idempotent_operation?(op) ⇒ Boolean

Returns:



59
60
61
# File 'lib/postqueue/queue.rb', line 59

def idempotent_operation?(op)
  @idempotent_operations.fetch(op) { @idempotent_operations.fetch("*", false) }
end

#loggerObject



31
32
33
# File 'lib/postqueue/queue/logging.rb', line 31

def logger
  Postqueue.logger
end

#on(op, batch_size: nil, idempotent: nil, &block) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/postqueue/queue/callback.rb', line 26

def on(op, batch_size: nil, idempotent: nil, &block)
  assert_valid_op! op
  callbacks[op] = block

  if batch_size
    raise ArgumentError, "Can't set per-op batchsize for op '*'" if op == "*"
    @batch_sizes[op] = batch_size
  end

  unless idempotent.nil?
    raise ArgumentError, "Can't idempotent for default op '*'" if op == "*"
    @idempotent_operations[op] = idempotent
  end

  self
end

#process(op: nil, batch_size: nil) ⇒ Object

Processes up to batch_size entries

process batch_size: 100



7
8
9
10
11
# File 'lib/postqueue/queue/processing.rb', line 7

def process(op: nil, batch_size: nil)
  item_class.transaction do
    process_inside_transaction(op: op, batch_size: batch_size)
  end
end

#process_one(op: nil) ⇒ Object

processes a single entry



14
15
16
# File 'lib/postqueue/queue/processing.rb', line 14

def process_one(op: nil)
  process(op: op, batch_size: 1)
end

#process_until_empty(op: nil, batch_size: nil) ⇒ Object



18
19
20
21
22
23
24
25
26
# File 'lib/postqueue/queue/processing.rb', line 18

def process_until_empty(op: nil, batch_size: nil)
  count = 0
  loop do
    processed_items = process(op: op, batch_size: batch_size)
    break if processed_items == 0
    count += processed_items
  end
  count
end

#processing(processing = nil) ⇒ Object

sets or return the processing mode. This must be one of :async, :sync or :verify



18
19
20
21
22
23
24
25
# File 'lib/postqueue/queue.rb', line 18

def processing(processing = nil)
  return @processing if processing.nil?

  unless VALID_PROCESSING_VALUES.include?(processing)
    raise ArgumentError, "Invalid processing value, must be one of #{VALID_PROCESSING_VALUES.inspect}"
  end
  @processing = processing
end

#run(&block) ⇒ Object



4
5
6
7
# File 'lib/postqueue/queue/runner.rb', line 4

def run(&block)
  @run = block if block
  @run
end

#run!Object



9
10
11
12
# File 'lib/postqueue/queue/runner.rb', line 9

def run!
  set_default_runner unless @run
  @run.call(self)
end

#select_and_lock(relation, limit:) ⇒ Object

Select and lock up to a limit unlocked items in the queue. Used by select_and_lock_batch.



15
16
17
18
19
20
21
22
23
24
# File 'lib/postqueue/queue/select_and_lock.rb', line 15

def select_and_lock(relation, limit:)
  relation = upcoming(relation)

  # FOR UPDATE SKIP LOCKED selects and locks entries, but skips those that
  # are already locked - preventing this transaction from being locked.
  sql = relation.to_sql + " FOR UPDATE SKIP LOCKED"
  sql += " LIMIT #{limit}" if limit

  item_class.find_by_sql(sql)
end

#select_and_lock_batch(op:, max_batch_size:) ⇒ Object

returns a batch of queue items for processing. These queue items are choosen depending on the passed in op: and batch_size: settings (if any).

All selected queue items will have the same op value. If an op: value is passed in, that one is chosen as a filter condition, otherwise the op value of the first queue entry is used insteatd.

This method will at maximum select and lock a batch_size items. If the a batch_size configured in the queue is smaller than the value passed in here that one is used instead.

Returns an array of item objects.



38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/postqueue/queue/select_and_lock.rb', line 38

def select_and_lock_batch(op:, max_batch_size:)
  relation = item_class.all
  relation = relation.where(op: op) if op

  match = select_and_lock(relation, limit: 1).first
  return [] unless match

  batch_size = calculate_batch_size(op: match.op, max_batch_size: max_batch_size)
  return [ match ] if batch_size <= 1

  batch_relation = relation.where(op: match.op)
  select_and_lock(batch_relation, limit: batch_size)
end

#select_and_lock_duplicates(op:, entity_ids:) ⇒ Object

Raises:



52
53
54
55
56
57
58
# File 'lib/postqueue/queue/select_and_lock.rb', line 52

def select_and_lock_duplicates(op:, entity_ids:)
  raise ArgumentError, "Missing op argument" unless op
  return [] if entity_ids.empty?

  relation = item_class.where(op: op, entity_id: entity_ids)
  select_and_lock(relation, limit: nil)
end

#set_default_runnerObject



14
15
16
17
18
19
20
21
22
23
# File 'lib/postqueue/queue/runner.rb', line 14

def set_default_runner
  run do |queue|
    loop do
      queue.logger.debug "#{queue}: Processing until empty"
      queue.process_until_empty
      queue.logger.debug "#{queue}: sleeping"
      sleep 1
    end
  end
end

#to_sObject



35
36
37
# File 'lib/postqueue/queue/logging.rb', line 35

def to_s
  item_class.table_name
end

#upcoming(relation = nil, subselect: true) ⇒ Object

:nodoc:



3
4
5
6
7
8
9
10
11
# File 'lib/postqueue/queue/select_and_lock.rb', line 3

def upcoming(relation = nil, subselect: true) #:nodoc:
  relation = item_class.all if relation.nil?
  relation = relation.select(:id, :entity_id, :op) if subselect

  # Ordering by next_run_at and id should not strictly be necessary, but helps
  # processing entries in the passed in order when enqueued at the same time.
  relation.where("failed_attempts < ? AND next_run_at < ?", max_attemps, Time.now)
          .order(:next_run_at, :id)
end