Class: Thread::Pipe

Inherits:
Object
  • Object
show all
Defined in:
lib/thread/pipe.rb

Overview

A pipe lets you execute various tasks on a set of data in parallel, each datum inserted in the pipe is passed along through queues to the various functions composing the pipe, the final result is inserted in the final queue.

Defined Under Namespace

Classes: Task

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input = Queue.new, output = Queue.new) ⇒ Pipe

Create a pipe using the optionally passed objects as input and output queue.

The objects must respond to #enq and #deq, and block on #deq.



57
58
59
60
61
62
63
64
# File 'lib/thread/pipe.rb', line 57

def initialize (input = Queue.new, output = Queue.new)
	@tasks = []

	@input  = input
	@output = output

	ObjectSpace.define_finalizer self, self.class.finalizer(@tasks)
end

Class Method Details

.finalizer(tasks) ⇒ Object



67
68
69
70
71
# File 'lib/thread/pipe.rb', line 67

def self.finalizer (tasks)
	proc {
		tasks.each(&:kill)
	}
end

Instance Method Details

#deq(non_block = false) ⇒ Object Also known as: pop, ~

Get an element from the output queue.



106
107
108
# File 'lib/thread/pipe.rb', line 106

def deq (non_block = false)
	@output.deq(non_block)
end

#empty?Boolean

Check if the pipe is empty.

Returns:

  • (Boolean)


89
90
91
# File 'lib/thread/pipe.rb', line 89

def empty?
	@input.empty? && @output.empty? && @tasks.all?(&:empty?)
end

#enq(data) ⇒ Object Also known as: push, <<

Insert data in the pipe.



94
95
96
97
98
99
100
# File 'lib/thread/pipe.rb', line 94

def enq (data)
	return if @tasks.empty?

	@input.enq data

	self
end

#|(func) ⇒ Object

Add a task to the pipe, it must respond to #call and #arity, and #arity must return 1.



75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/thread/pipe.rb', line 75

def | (func)
	if func.arity != 1
		raise ArgumentError, 'wrong arity'
	end

	Task.new(func, (@tasks.empty? ? @input : Queue.new), @output).tap {|t|
		@tasks.last.output = t.input unless @tasks.empty?
		@tasks << t
	}

	self
end