Class: Libhoney::Queueing::SizedQueueWithTimeout

Inherits:
Object
  • Object
show all
Defined in:
lib/libhoney/queueing/sized_queue_with_timeout.rb

Overview

A queue implementation with optional size limit and optional timeouts on pop and push operations. Heavily influenced / liberally mimicking Avdi Grimm’s Tapas::Queue.

Defined Under Namespace

Classes: PopTimedOut, PushTimedOut

Instance Method Summary collapse

Constructor Details

#initialize(max_size = Float::INFINITY, options = {}) ⇒ SizedQueueWithTimeout

Returns a new instance of SizedQueueWithTimeout.

Parameters:

  • max_size (Integer, Float::INFINITY) (defaults to: Float::INFINITY)

    the size limit for this queue

  • options (Hash) (defaults to: {})

    optional dependencies to inject, primarily for testing

Options Hash (options):

  • :lock (QLock, Mutex)

    the lock for synchronizing queue state change

  • :space_available_condition (QCondition)

    the condition variable to wait/signal on for space being available in the queue; when provided, must be accompanied by an :item_available_condition and the shared :lock

  • :item_available_condition (QCondition)

    the condition variable to wait/signal on for an item being added to the queue; when provided, must be accompanied by an :space_available_condition and the shared :lock



26
27
28
29
30
31
32
# File 'lib/libhoney/queueing/sized_queue_with_timeout.rb', line 26

def initialize(max_size = Float::INFINITY, options = {})
  @items           = []
  @max_size        = max_size
  @lock            = options.fetch(:lock) { QLock.new }
  @space_available = options.fetch(:space_available_condition) { QCondition.new(@lock) }
  @item_available  = options.fetch(:item_available_condition) { QCondition.new(@lock) }
end

Instance Method Details

#clearObject

Removes all objects from the queue. They are cast into the abyss never to be seen again.



80
81
82
83
84
85
# File 'lib/libhoney/queueing/sized_queue_with_timeout.rb', line 80

def clear
  @lock.synchronize do
    @items = []
    @space_available.signal unless full?
  end
end

#pop(timeout = :never, &timeout_policy) ⇒ Object Also known as: deq, shift

Pop something off the queue.

Parameters:

  • timeout (Numeric, :never) (defaults to: :never)

    how long in seconds to wait for the queue to have an item available or :never to wait “forever”

  • timeout_policy (#call)

    defaults to -> { raise PopTimedOut } - a lambda/Proc/callable, what to do when the timeout expires

Returns:

  • (Object)

Raises:



65
66
67
68
69
70
71
72
73
# File 'lib/libhoney/queueing/sized_queue_with_timeout.rb', line 65

def pop(timeout = :never, &timeout_policy)
  timeout_policy ||= -> { raise PopTimedOut }

  wait_for_condition(@item_available, -> { !empty? }, timeout, timeout_policy) do
    item = @items.shift
    @space_available.signal unless full?
    item
  end
end

#push(obj, timeout = :never, &timeout_policy) ⇒ Object Also known as: enq, <<

Push something onto the queue.

Parameters:

  • obj (Object)

    the thing to add to the queue

  • timeout (Numeric, :never) (defaults to: :never)

    how long in seconds to wait for the queue to have space available or :never to wait “forever”

  • timeout_policy (#call)

    defaults to -> { raise PushTimedOut } - a lambda/Proc/callable, what to do when the timeout expires

Raises:



44
45
46
47
48
49
50
51
# File 'lib/libhoney/queueing/sized_queue_with_timeout.rb', line 44

def push(obj, timeout = :never, &timeout_policy)
  timeout_policy ||= -> { raise PushTimedOut }

  wait_for_condition(@space_available, -> { !full? }, timeout, timeout_policy) do
    @items.push(obj)
    @item_available.signal
  end
end