Class: Gitlab::BatchPopQueueing

Inherits:
Object
  • Object
show all
Defined in:
lib/gitlab/batch_pop_queueing.rb

Overview

This class is a queuing system for processing expensive tasks in an atomic manner with batch poping to let you optimize the total processing time.

In usual queuing system, the first item started being processed immediately and the following items wait until the next items have been popped from the queue. On the other hand, this queueing system, the former part is same, however, it pops the enqueued items as batch. This is especially useful when you want to drop redandant items from the queue in order to process important items only, thus it's more efficient than the traditional queueing system.

Caveats:

  • The order of the items are not guaranteed because of `sadd` (Redis Sets).

Example: “` class TheWorker

def perform
  result = Gitlab::BatchPopQueueing.new('feature', 'queue').safe_execute([item]) do |items_in_queue|
    item = extract_the_most_important_item_from(items_in_queue)
    expensive_process(item)
  end

  if result[:status] == :finished && result[:new_items].present?
    item = extract_the_most_important_item_from(items_in_queue)
    TheWorker.perform_async(item.id)
  end
end

end “`

Constant Summary collapse

EXTRA_QUEUE_EXPIRE_WINDOW =
1.hour
MAX_COUNTS_OF_POP_ALL =
1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(namespace, queue_id) ⇒ Boolean

Initialize queue

Parameters:

  • namespace (String)

    The namespace of the exclusive lock and queue key. Typically, it's a feature name.

  • queue_id (String)

    The identifier of the queue.

Raises:

  • (ArgumentError)

46
47
48
49
50
# File 'lib/gitlab/batch_pop_queueing.rb', line 46

def initialize(namespace, queue_id)
  raise ArgumentError if namespace.empty? || queue_id.empty?

  @namespace, @queue_id = namespace, queue_id
end

Instance Attribute Details

#namespaceObject (readonly)

Returns the value of attribute namespace


36
37
38
# File 'lib/gitlab/batch_pop_queueing.rb', line 36

def namespace
  @namespace
end

#queue_idObject (readonly)

Returns the value of attribute queue_id


36
37
38
# File 'lib/gitlab/batch_pop_queueing.rb', line 36

def queue_id
  @queue_id
end

Instance Method Details

#safe_execute(new_items, lock_timeout: 10.minutes, &block) ⇒ Hash

Execute the given block in an exclusive lock. If there is the other thread has already working on the block, it enqueues the items without processing the block.

NOTE: If an exception is raised in the block, the poppped items will not be recovered.

We should NOT re-enqueue the items in this case because it could end up in an infinite loop.

Parameters:

  • new_items (Array<String>)

    New items to be added to the queue.

  • lock_timeout (Time) (defaults to: 10.minutes)

    The timeout of the exclusive lock. Generally, this value should be longer than the maximum prosess timing of the given block.

Returns:

  • (Hash)
    • status => One of the `:enqueued` or `:finished`.

    • new_items => Newly enqueued items during the given block had been processed.


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/gitlab/batch_pop_queueing.rb', line 65

def safe_execute(new_items, lock_timeout: 10.minutes, &block)
  enqueue(new_items, lock_timeout + EXTRA_QUEUE_EXPIRE_WINDOW)

  lease = Gitlab::ExclusiveLease.new(lock_key, timeout: lock_timeout)

  return { status: :enqueued } unless uuid = lease.try_obtain

  begin
    all_args = pop_all

    yield all_args if block_given?

    { status: :finished, new_items: peek_all }
  ensure
    Gitlab::ExclusiveLease.cancel(lock_key, uuid)
  end
end