Class: Performer::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/performer/queue.rb

Overview

Similar to the stdlib Queue, but with a thread-safe way of closing it down.

Instance Method Summary collapse

Constructor Details

#initializeQueue

Returns a new instance of Queue.



6
7
8
9
10
11
12
# File 'lib/performer/queue.rb', line 6

def initialize
  @queue = []
  @queue_mutex = Monitor.new
  @queue_cond = @queue_mutex.new_cond
  @undefined = {}
  @open = true
end

Instance Method Details

#close(obj = undefined) { ... } ⇒ Object?

Close the queue, optionally pushing an item onto the queue right before close.

Examples:

close and enqueue

queue.close(object) do
  raise "Queue is was already closed!"
end

close without enqueue

queue.close # => no need for block, since no argument

Parameters:

  • obj (Object, nil) (defaults to: undefined)

Yields:

  • if obj could not be pushed onto the queue

Returns:

  • (Object, nil)

    obj

Raises:

  • (ArgumentError)

    if obj given, but no block given



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/performer/queue.rb', line 91

def close(obj = undefined)
  if undefined.equal?(obj)
    @queue_mutex.synchronize do
      @open = false
      @queue_cond.broadcast
    end

    nil
  elsif not block_given?
    raise ArgumentError, "no block given"
  else
    pushed = false
    @queue_mutex.synchronize do
      pushed = try_push(obj)
      @open = false
      @queue_cond.broadcast
    end
    yield if not pushed

    obj
  end
end

#deq {|obj| ... } ⇒ Boolean

Retrieve an object from the queue, or block until one is available.

The behaviour is as follows:

  • empty, open: block until queue is either not empty, or open

  • not empty, open: yield an item off the queue, return true

  • not empty, not open: yield an item off the queue, return false

  • empty, not open: return false

Examples:

open = queue.deq do |obj|
  # do something with obj
end

Yields:

  • (obj)

    an item retrieved from the queue, if available

Returns:

  • (Boolean)

    true if queue is open, false if open

Raises:

  • (ArgumentError)

    if no block given



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/performer/queue.rb', line 56

def deq
  unless block_given?
    raise ArgumentError, "no block given"
  end

  obj, was_open = @queue_mutex.synchronize do
    @queue_cond.wait_while { empty? and open? }

    obj = if empty?
      undefined
    else
      queue.shift
    end

    [obj, open?]
  end

  yield obj unless undefined.equal?(obj)
  was_open
end

#empty?Boolean

Returns true if queue is empty.

Returns:

  • (Boolean)

    true if queue is empty



115
116
117
# File 'lib/performer/queue.rb', line 115

def empty?
  queue.empty?
end

#enq(obj) { ... } ⇒ Object

Push an object into the queue, or yield if not possible.

Examples:

pushing an item onto the queue

queue.enq(obj) do
  raise "Unable to push #{obj} into queue!"
end

Parameters:

  • obj

Yields:

  • if obj could not be pushed onto the queue

Returns:

  • obj

Raises:

  • (ArgumentError)

    if no block given



25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/performer/queue.rb', line 25

def enq(obj)
  unless block_given?
    raise ArgumentError, "no block given"
  end

  pushed = false
  @queue_mutex.synchronize do
    pushed = try_push(obj)
    @queue_cond.signal
  end
  yield if not pushed

  obj
end