Class: ProducerConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/producer_consumer.rb,
lib/producer_consumer/version.rb

Defined Under Namespace

Classes: Finished, NoConsumer, NoProducer, Producer

Constant Summary collapse

VERSION =
'0.1'

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeProducerConsumer

Oh god. It’s a PATTERN.



28
29
30
31
32
33
# File 'lib/producer_consumer.rb', line 28

def initialize
  @queue = Queue.new 
  @state = :free
  @producers = []
  @consumers = []
end

Class Method Details

.consume(*a, &b) ⇒ Object



19
20
21
# File 'lib/producer_consumer.rb', line 19

def self.consume(*a, &b)
  new.consume(*a, &b)
end

.produce(*a, &b) ⇒ Object



23
24
25
# File 'lib/producer_consumer.rb', line 23

def self.produce(*a, &b)
  new.produce(*a, &b)
end

Instance Method Details

#consume(n = 1, &block) ⇒ Object

Registers n consumers



57
58
59
60
61
62
63
# File 'lib/producer_consumer.rb', line 57

def consume(n = 1, &block)
  @consumers += (0...n).map do |c|
    block
  end

  self
end

#consumer(block) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/producer_consumer.rb', line 65

def consumer(block)
  Thread.new do
    begin
      loop do
        x = @queue.pop

        if x == Finished
          raise Finished
        end

        block.call x
      end
    rescue Finished
    end
  end
end

#produce(n = 1, &block) ⇒ Object

Registers n producers



36
37
38
39
40
41
42
# File 'lib/producer_consumer.rb', line 36

def produce(n = 1, &block)
  @producers += (0...n).map do |i|
    block
  end

  self
end

#producer(block) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/producer_consumer.rb', line 44

def producer(block)
  Thread.new do
    p = Producer.new
    begin
      loop do
        @queue << block.call(p)
      end
    rescue Finished
    end
  end
end

#runObject

Run!

Raises:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/producer_consumer.rb', line 83

def run
  # Can't run this more than once.
  unless @state == :free
    raise RuntimeError, "Already running"
  end

  # Validate we will actually do something.
  raise NoConsumer if @consumers.empty?
  raise NoProducer if @producers.empty?

  # Start producers and consumers
  @state = :starting
  consumers = @consumers.map do |block|
    consumer block
  end
  producers = @producers.map do |block|
    producer block
  end

  # Wait for producers to finish
  @state = :waiting_for_producers
  producers.each { |t| t.join }

  # Signal consumers to finish...
  @state = :waiting_for_consumers
  consumers.size.times do
    @queue << Finished
  end
  consumers.each { |t| t.join }

  @state = :free
end