Class: Consumer

Inherits:
Object show all
Includes:
Sink
Defined in:
lib/coroutines/base.rb

Overview

A class implementing consumer coroutines

A Consumer can be created by the following methods:

  • Object#consum_for

  • Consumer.new

  • Transformer#out_connect

See Object#consum_for for an explanation of the basic concepts.

Defined Under Namespace

Classes: Yielder

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Sink

#<=, #in_connect, #input_map, #input_reduce, #input_reject, #input_select

Constructor Details

#initialize(&block) ⇒ Consumer

:call-seq:

Consumer.new { |yielder| ... } -> consumer

Creates a new Consumer coroutine, which can be used as a Sink.

The block is called immediately with a “yielder” object as parameter. yielder can be used to retrieve a value from the consumption context by calling its await method.

consum = Consumer.new do |y|
  a = y.await
  b = y.await
  "#{a} + #{b} = #{a + b}"
end

consum << 42 << 24
consum.close  # => "42 + 24 = 66"


53
54
55
56
57
58
# File 'lib/coroutines/base.rb', line 53

def initialize(&block)
	@fiber = Fiber.new(&block)
	@result = nil

	self << Yielder.new
end

Instance Attribute Details

#resultObject (readonly)

After the consumer has terminated (which may have happened in response to Consumer#close), result contains the value returned by the consumer.



67
68
69
# File 'lib/coroutines/base.rb', line 67

def result
  @result
end

Instance Method Details

#<<(obj) ⇒ Object

:call-seq:

consumer << obj -> consumer

Feeds obj as an input to the consumer.

Raises:

  • (StopIteration)


73
74
75
76
77
78
# File 'lib/coroutines/base.rb', line 73

def <<(obj)
	raise StopIteration unless @fiber.alive?
	value = @fiber.resume(obj)
	@result = value unless @fiber.alive?
	self
end

#closeObject

:call-seq:

consumer.close -> obj

Terminate the consumer by raising StopIteration at the point where it last requested an input value. obj is the return value of the consumer.



86
87
88
89
90
91
# File 'lib/coroutines/base.rb', line 86

def close
	@result = @fiber.raise StopIteration if @fiber.alive?
	return @result
rescue StopIteration
	return @result
end

#inspectObject



60
61
62
63
# File 'lib/coroutines/base.rb', line 60

def inspect
	status = if @fiber.alive? then "running" else @result.inspect end
	"#<Consumer: 0x#{object_id.to_s(16)} (#{status})>"
end