Class: Thimble::ThimbleQueue

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/ThimbleQueue.rb

Direct Known Subclasses

Thimble

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size, name) ⇒ ThimbleQueue

Returns a new instance of ThimbleQueue.

Raises:

  • (ArgumentError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/ThimbleQueue.rb', line 9

def initialize(size, name)
  raise ArgumentError.new("make sure there is a size for the queue greater than 1! size received #{size}") unless size >= 1
  @id = Digest::SHA256.digest(rand(10**100).to_s + Time.now.to_i.to_s)
  @name = name
  @size = size
  @mutex = Mutex.new
  @queue = []
  @closed = false
  @close_now = false
  @empty = ConditionVariable.new
  @full = ConditionVariable.new
  @logger = Logger.new(STDOUT)
  @logger.sev_threshold = Logger::UNKNOWN
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



8
9
10
# File 'lib/ThimbleQueue.rb', line 8

def size
  @size
end

Instance Method Details

#+(other) ⇒ ThimbleQueue

Will concatenate an enumerable to the ThimbleQueue

Parameters:

  • (Enumerable)

Returns:

Raises:

  • (ArgumentError)


43
44
45
46
47
48
49
# File 'lib/ThimbleQueue.rb', line 43

def +(other)
  raise ArgumentError.new("+ requires another Enumerable!") unless other.class < Enumerable
  merged_thimble = ThimbleQueue.new(length + other.length, @name)
  self.each {|item| merged_thimble.push(item)}
  other.each {|item| merged_thimble.push(item)}
  merged_thimble
end

#close(now = false) ⇒ nil

Closes the ThibleQueue

Parameters:

  • (TrueClass, FalseClass)

Returns:

  • (nil)

Raises:

  • (ArgumentError)


110
111
112
113
114
115
116
117
118
119
120
# File 'lib/ThimbleQueue.rb', line 110

def close(now = false)
  raise ArgumentError.new("now must be true or false") unless (now == true || now == false)
  @logger.debug("#{@name} is closing")
  @mutex.synchronize do
    @closed = true
    @close_now = true if now
    @full.broadcast
    @empty.broadcast
  end
  @logger.debug("#{@name} is closed: #{@closed} now: #{@close_now}")
end

#closed?TrueClass, FalseClass

checks if the ThimbleQueue is closed

Returns:

  • (TrueClass, FalseClass)


134
135
136
# File 'lib/ThimbleQueue.rb', line 134

def closed?
  @closed
end

#eachObject



28
29
30
31
32
# File 'lib/ThimbleQueue.rb', line 28

def each
  while item = self.next
    yield item.item
  end
end

#lengthFixnum

Returns the size of the ThimbleQueue

Returns:

  • (Fixnum)


36
37
38
# File 'lib/ThimbleQueue.rb', line 36

def length
  size
end

#nextObject

Returns the first item in the queue

Returns:

  • (Object)


53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/ThimbleQueue.rb', line 53

def next
  @mutex.synchronize  do
    while !@close_now
      a = @queue.shift
      @logger.debug("#{@name}'s queue shifted to: #{a}")
      if !a.nil?
        @full.broadcast
        @empty.broadcast
        return a
      else 
        @logger.debug("#{@name}'s queue is currently closed?: #{closed?}")
        return nil if closed?
        @empty.wait(@mutex)
      end
    end
  end
end

#push(x) ⇒ Object

This will push whatever it is handed to the queue

Parameters:

  • (Object)

Raises:

  • (RuntimeError)


73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/ThimbleQueue.rb', line 73

def push(x)
  raise RuntimeError.new("Queue is closed!") if @closed
  @logger.debug("Pushing into #{@name} values: #{x}")
  @mutex.synchronize do
    while !offer(x)
      @full.wait(@mutex)
      @logger.debug("#{@name} is waiting on full")
    end
    @empty.broadcast
  end
  @logger.debug("Finished pushing int #{@name}: #{x}")
end

#push_flat(x) ⇒ nil

This will flatten any nested arrays out and feed them one at a time to the queue.

Parameters:

  • (Object, Enumerable)

Returns:

  • (nil)

Raises:

  • (RuntimeError)


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/ThimbleQueue.rb', line 90

def push_flat(x)
  raise RuntimeError.new("Queue is closed!") if @closed
  @logger.debug("Pushing flat into #{@name} values: #{x}")
  if x.respond_to? :each
    x.each {|item| push(item)}
  else
    @mutex.synchronize do
      while !offer(x)
        @logger.debug("#{@name} is waiting on full")
        @full.wait(@mutex)
      end
      @empty.broadcast
    end
  end
  @logger.debug("Finished pushing flat into #{@name} values: #{x}")
end

#setLogger(level) ⇒ Object



24
25
26
# File 'lib/ThimbleQueue.rb', line 24

def setLogger(level)
  @logger.sev_threshold = level
end

#to_aArray[Object]

Will force the ThimbleQueue into an array

Returns:

  • (Array[Object])


124
125
126
127
128
129
130
# File 'lib/ThimbleQueue.rb', line 124

def to_a
  a = []
  while item = self.next
    a << item.item
  end
  a
end