Class: Postqueue::Queue
- Inherits:
-
Object
- Object
- Postqueue::Queue
- Defined in:
- lib/postqueue/queue/runner.rb,
lib/postqueue/queue.rb,
lib/postqueue/queue/logging.rb,
lib/postqueue/queue/callback.rb,
lib/postqueue/queue/processing.rb,
lib/postqueue/queue/select_and_lock.rb
Overview
The Postqueue processor processes items in a single Postqueue table.
Defined Under Namespace
Classes: Timing
Constant Summary collapse
- VALID_PROCESSING_VALUES =
[ :async, :sync, :verify ]
Instance Attribute Summary collapse
-
#default_batch_size ⇒ Object
The default batch size.
-
#item_class ⇒ Object
The AR::Base class to use.
-
#max_attemps ⇒ Object
readonly
maximum number of processing attempts.
Instance Method Summary collapse
- #assert_valid_op!(op) ⇒ Object
- #batch_size(op:) ⇒ Object
- #calculate_batch_size(op:, max_batch_size:) ⇒ Object
- #enqueue(op:, entity_id:) ⇒ Object
- #idempotent_operation?(op) ⇒ Boolean
-
#initialize {|_self| ... } ⇒ Queue
constructor
A new instance of Queue.
- #logger ⇒ Object
- #on(op, batch_size: nil, idempotent: nil, &block) ⇒ Object
-
#process(op: nil, batch_size: nil) ⇒ Object
Processes up to batch_size entries.
-
#process_one(op: nil) ⇒ Object
processes a single entry.
- #process_until_empty(op: nil, batch_size: nil) ⇒ Object
-
#processing(processing = nil) ⇒ Object
sets or return the processing mode.
- #run(&block) ⇒ Object
- #run! ⇒ Object
-
#select_and_lock(relation, limit:) ⇒ Object
Select and lock up to a limit unlocked items in the queue.
-
#select_and_lock_batch(op:, max_batch_size:) ⇒ Object
returns a batch of queue items for processing.
- #select_and_lock_duplicates(op:, entity_ids:) ⇒ Object
- #set_default_runner ⇒ Object
- #to_s ⇒ Object
-
#upcoming(relation = nil, subselect: true) ⇒ Object
:nodoc:.
Constructor Details
#initialize {|_self| ... } ⇒ Queue
Returns a new instance of Queue.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/postqueue/queue.rb', line 27 def initialize(&block) @batch_sizes = {} @item_class = ::Postqueue::Item @default_batch_size = 1 @max_attemps = 5 @idempotent_operations = {} @batch_sizes = {} @processing = :async on "test" do |_op, entity_ids| Postqueue.logger.info "[test] processing entity_ids: #{entity_ids.inspect}" end on "fail" do |_op, entity_ids| raise "Postqueue test failure, w/entity_ids: #{entity_ids.inspect}" end on :missing_handler do |op, entity_ids| raise MissingHandler, queue: self, op: op, entity_ids: entity_ids end on_exception do |e, _, _| e.send :raise end yield self if block end |
Instance Attribute Details
#default_batch_size ⇒ Object
The default batch size. Will be used if no specific batch size is defined for an operation.
9 10 11 |
# File 'lib/postqueue/queue.rb', line 9 def default_batch_size @default_batch_size end |
#item_class ⇒ Object
The AR::Base class to use. You would only change this if you want to run the queue in a different database or in a different table.
5 6 7 |
# File 'lib/postqueue/queue.rb', line 5 def item_class @item_class end |
#max_attemps ⇒ Object (readonly)
maximum number of processing attempts.
12 13 14 |
# File 'lib/postqueue/queue.rb', line 12 def max_attemps @max_attemps end |
Instance Method Details
#assert_valid_op!(op) ⇒ Object
19 20 21 22 23 24 |
# File 'lib/postqueue/queue/callback.rb', line 19 def assert_valid_op!(op) return if op == :missing_handler return if op.is_a?(String) raise ArgumentError, "Invalid op #{op.inspect}, must be a string" end |
#batch_size(op:) ⇒ Object
55 56 57 |
# File 'lib/postqueue/queue.rb', line 55 def batch_size(op:) @batch_sizes[op] || default_batch_size || 1 end |
#calculate_batch_size(op:, max_batch_size:) ⇒ Object
60 61 62 63 64 65 |
# File 'lib/postqueue/queue/select_and_lock.rb', line 60 def calculate_batch_size(op:, max_batch_size:) recommended_batch_size = batch_size(op: op) return 1 if recommended_batch_size < 2 return recommended_batch_size unless max_batch_size max_batch_size < recommended_batch_size ? max_batch_size : recommended_batch_size end |
#enqueue(op:, entity_id:) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/postqueue/queue.rb', line 63 def enqueue(op:, entity_id:) enqueued_items = item_class.enqueue op: op, entity_id: entity_id, ignore_duplicates: idempotent_operation?(op) return enqueued_items unless enqueued_items > 0 case processing when :async :nop when :sync process_until_empty(op: op) when :verify raise(MissingHandler, queue: self, op: op, entity_ids: [entity_id]) unless callback_for(op: op) end enqueued_items end |
#idempotent_operation?(op) ⇒ Boolean
59 60 61 |
# File 'lib/postqueue/queue.rb', line 59 def idempotent_operation?(op) @idempotent_operations.fetch(op) { @idempotent_operations.fetch("*", false) } end |
#logger ⇒ Object
31 32 33 |
# File 'lib/postqueue/queue/logging.rb', line 31 def logger Postqueue.logger end |
#on(op, batch_size: nil, idempotent: nil, &block) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/postqueue/queue/callback.rb', line 26 def on(op, batch_size: nil, idempotent: nil, &block) assert_valid_op! op callbacks[op] = block if batch_size raise ArgumentError, "Can't set per-op batchsize for op '*'" if op == "*" @batch_sizes[op] = batch_size end unless idempotent.nil? raise ArgumentError, "Can't idempotent for default op '*'" if op == "*" @idempotent_operations[op] = idempotent end self end |
#process(op: nil, batch_size: nil) ⇒ Object
Processes up to batch_size entries
process batch_size: 100
7 8 9 10 11 |
# File 'lib/postqueue/queue/processing.rb', line 7 def process(op: nil, batch_size: nil) item_class.transaction do process_inside_transaction(op: op, batch_size: batch_size) end end |
#process_one(op: nil) ⇒ Object
processes a single entry
14 15 16 |
# File 'lib/postqueue/queue/processing.rb', line 14 def process_one(op: nil) process(op: op, batch_size: 1) end |
#process_until_empty(op: nil, batch_size: nil) ⇒ Object
18 19 20 21 22 23 24 25 26 |
# File 'lib/postqueue/queue/processing.rb', line 18 def process_until_empty(op: nil, batch_size: nil) count = 0 loop do processed_items = process(op: op, batch_size: batch_size) break if processed_items == 0 count += processed_items end count end |
#processing(processing = nil) ⇒ Object
sets or return the processing mode. This must be one of :async, :sync or :verify
18 19 20 21 22 23 24 25 |
# File 'lib/postqueue/queue.rb', line 18 def processing(processing = nil) return @processing if processing.nil? unless VALID_PROCESSING_VALUES.include?(processing) raise ArgumentError, "Invalid processing value, must be one of #{VALID_PROCESSING_VALUES.inspect}" end @processing = processing end |
#run(&block) ⇒ Object
4 5 6 7 |
# File 'lib/postqueue/queue/runner.rb', line 4 def run(&block) @run = block if block @run end |
#run! ⇒ Object
9 10 11 12 |
# File 'lib/postqueue/queue/runner.rb', line 9 def run! set_default_runner unless @run @run.call(self) end |
#select_and_lock(relation, limit:) ⇒ Object
Select and lock up to a limit unlocked items in the queue. Used by select_and_lock_batch.
15 16 17 18 19 20 21 22 23 24 |
# File 'lib/postqueue/queue/select_and_lock.rb', line 15 def select_and_lock(relation, limit:) relation = upcoming(relation) # FOR UPDATE SKIP LOCKED selects and locks entries, but skips those that # are already locked - preventing this transaction from being locked. sql = relation.to_sql + " FOR UPDATE SKIP LOCKED" sql += " LIMIT #{limit}" if limit item_class.find_by_sql(sql) end |
#select_and_lock_batch(op:, max_batch_size:) ⇒ Object
returns a batch of queue items for processing. These queue items are choosen depending on the passed in op: and batch_size: settings (if any).
All selected queue items will have the same op value. If an op: value is passed in, that one is chosen as a filter condition, otherwise the op value of the first queue entry is used insteatd.
This method will at maximum select and lock a batch_size items. If the a batch_size configured in the queue is smaller than the value passed in here that one is used instead.
Returns an array of item objects.
38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/postqueue/queue/select_and_lock.rb', line 38 def select_and_lock_batch(op:, max_batch_size:) relation = item_class.all relation = relation.where(op: op) if op match = select_and_lock(relation, limit: 1).first return [] unless match batch_size = calculate_batch_size(op: match.op, max_batch_size: max_batch_size) return [ match ] if batch_size <= 1 batch_relation = relation.where(op: match.op) select_and_lock(batch_relation, limit: batch_size) end |
#select_and_lock_duplicates(op:, entity_ids:) ⇒ Object
52 53 54 55 56 57 58 |
# File 'lib/postqueue/queue/select_and_lock.rb', line 52 def select_and_lock_duplicates(op:, entity_ids:) raise ArgumentError, "Missing op argument" unless op return [] if entity_ids.empty? relation = item_class.where(op: op, entity_id: entity_ids) select_and_lock(relation, limit: nil) end |
#set_default_runner ⇒ Object
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/postqueue/queue/runner.rb', line 14 def set_default_runner run do |queue| loop do queue.logger.debug "#{queue}: Processing until empty" queue.process_until_empty queue.logger.debug "#{queue}: sleeping" sleep 1 end end end |
#to_s ⇒ Object
35 36 37 |
# File 'lib/postqueue/queue/logging.rb', line 35 def to_s item_class.table_name end |
#upcoming(relation = nil, subselect: true) ⇒ Object
:nodoc:
3 4 5 6 7 8 9 10 11 |
# File 'lib/postqueue/queue/select_and_lock.rb', line 3 def upcoming(relation = nil, subselect: true) #:nodoc: relation = item_class.all if relation.nil? relation = relation.select(:id, :entity_id, :op) if subselect # Ordering by next_run_at and id should not strictly be necessary, but helps # processing entries in the passed in order when enqueued at the same time. relation.where("failed_attempts < ? AND next_run_at < ?", max_attemps, Time.now) .order(:next_run_at, :id) end |