Class: Hx::Interop::Channel
- Inherits:
-
Object
- Object
- Hx::Interop::Channel
show all
- Defined in:
- lib/interop/channel.rb
Overview
Defined Under Namespace
Classes: Blocker, Get, Put
Constant Summary
collapse
- Deadlock =
Class.new(Error)
- Closed =
Class.new(Error)
Instance Method Summary
collapse
Constructor Details
#initialize(butter_limit = 0) ⇒ Channel
Returns a new instance of Channel.
55
56
57
58
59
60
61
|
# File 'lib/interop/channel.rb', line 55
def initialize(butter_limit = 0)
@buffer_limit = butter_limit
@buffer = []
@gets = []
@puts = []
@mutex = Mutex.new
end
|
Instance Method Details
#<<(obj) ⇒ Object
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'lib/interop/channel.rb', line 68
def <<(obj)
put = nil
@mutex.synchronize do
raise Closed if @closed
if (get = @gets.shift)
get.resolve obj
elsif @buffer.length < @buffer_limit
@buffer << obj
else
raise Deadlock if Thread.list.one?
put = Put.new(obj)
@puts << put
end
end
put&.wait
self
end
|
#close ⇒ Object
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/interop/channel.rb', line 106
def close
raise Closed if @closed
@mutex.synchronize do
@closed = true
@puts.each &:resolve
@gets.each { |g| g.resolve nil }
end
@buffer
end
|
#get ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/interop/channel.rb', line 89
def get
get = nil
@mutex.synchronize do
return if @closed
put = @puts.shift
return put.resolve if put
return @buffer.shift if @buffer.any?
raise Deadlock if Thread.list.one?
get = Get.new
@gets << get
end
get.wait
end
|
#put(*objects) ⇒ Object
63
64
65
66
|
# File 'lib/interop/channel.rb', line 63
def put(*objects)
objects.each &method(:<<)
self
end
|