Class: Concurrent::Channel
- Inherits:
-
Object
- Object
- Concurrent::Channel
show all
- Extended by:
- Forwardable
- Includes:
- Enumerable
- Defined in:
- lib/concurrent/channel.rb,
lib/concurrent/channel/tick.rb,
lib/concurrent/channel/selector.rb,
lib/concurrent/channel/buffer/base.rb,
lib/concurrent/channel/buffer/timer.rb,
lib/concurrent/channel/buffer/ticker.rb,
lib/concurrent/channel/buffer/sliding.rb,
lib/concurrent/channel/buffer/buffered.rb,
lib/concurrent/channel/buffer/dropping.rb,
lib/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent/channel/selector/put_clause.rb,
lib/concurrent/channel/selector/take_clause.rb,
lib/concurrent/channel/selector/after_clause.rb,
lib/concurrent/channel/selector/error_clause.rb,
lib/concurrent/channel/selector/default_clause.rb
Overview
Defined Under Namespace
Modules: Buffer
Classes: Tick, ValidationError
Constant Summary
collapse
- Error =
Class.new(StandardError)
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(opts = {}) ⇒ Channel
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/concurrent/channel.rb', line 46
def initialize(opts = {})
if opts.is_a? Buffer::Base
self.buffer = opts
return
end
capacity = opts[:capacity] || opts[:size]
buffer = opts[:buffer]
if capacity && buffer == :unbuffered
raise ArgumentError.new('unbuffered channels cannot have a capacity')
elsif capacity.nil? && buffer.nil?
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif capacity == 0 && buffer == :buffered
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif buffer == :unbuffered
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif capacity.nil? || capacity < 1
raise ArgumentError.new('capacity must be at least 1 for this buffer type')
else
buffer ||= :buffered
self.buffer = BUFFER_TYPES[buffer].new(capacity)
end
self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
end
|
Class Method Details
.go(*args, &block) ⇒ Object
223
224
225
|
# File 'lib/concurrent/channel.rb', line 223
def go(*args, &block)
go_via(GOROUTINES, *args, &block)
end
|
.go_loop(*args, &block) ⇒ Object
232
233
234
|
# File 'lib/concurrent/channel.rb', line 232
def go_loop(*args, &block)
go_loop_via(GOROUTINES, *args, &block)
end
|
.go_loop_via(executor, *args, &block) ⇒ Object
236
237
238
239
240
241
242
243
|
# File 'lib/concurrent/channel.rb', line 236
def go_loop_via(executor, *args, &block)
raise ArgumentError.new('no block given') unless block_given?
executor.post(block, *args) do
loop do
break unless block.call(*args)
end
end
end
|
.go_via(executor, *args, &block) ⇒ Object
227
228
229
230
|
# File 'lib/concurrent/channel.rb', line 227
def go_via(executor, *args, &block)
raise ArgumentError.new('no block given') unless block_given?
executor.post(*args, &block)
end
|
.select(*args) {|selector, args| ... } ⇒ Object
Also known as:
alt
215
216
217
218
219
220
|
# File 'lib/concurrent/channel.rb', line 215
def select(*args)
raise ArgumentError.new('no block given') unless block_given?
selector = Selector.new
yield(selector, *args)
selector.execute
end
|
.ticker(interval) ⇒ Object
Also known as:
tick
210
211
212
|
# File 'lib/concurrent/channel.rb', line 210
def ticker(interval)
Channel.new(Buffer::Ticker.new(interval))
end
|
.timer(seconds) ⇒ Object
Also known as:
after
205
206
207
|
# File 'lib/concurrent/channel.rb', line 205
def timer(seconds)
Channel.new(Buffer::Timer.new(seconds))
end
|
Instance Method Details
#each ⇒ Object
192
193
194
195
196
197
198
199
200
201
202
|
# File 'lib/concurrent/channel.rb', line 192
def each
raise ArgumentError.new('no block given') unless block_given?
loop do
item, more = do_next
if item != Concurrent::NULL
yield(item)
elsif !more
break
end
end
end
|
#next ⇒ Object
158
159
160
161
162
|
# File 'lib/concurrent/channel.rb', line 158
def next
item, more = do_next
item = nil if item == Concurrent::NULL
return item, more
end
|
#next? ⇒ Boolean
164
165
166
167
168
169
170
171
172
|
# File 'lib/concurrent/channel.rb', line 164
def next?
item, more = do_next
item = if item == Concurrent::NULL
Concurrent::Maybe.nothing
else
Concurrent::Maybe.just(item)
end
return item, more
end
|
#offer(item) ⇒ Object
98
99
100
101
|
# File 'lib/concurrent/channel.rb', line 98
def offer(item)
return false unless validate(item, false, false)
do_offer(item)
end
|
#offer!(item) ⇒ Object
103
104
105
106
107
108
|
# File 'lib/concurrent/channel.rb', line 103
def offer!(item)
validate(item, false, true)
ok = do_offer(item)
raise Error if !ok
ok
end
|
#offer?(item) ⇒ Boolean
110
111
112
113
114
115
116
117
118
|
# File 'lib/concurrent/channel.rb', line 110
def offer?(item)
if !validate(item, true, false)
Concurrent::Maybe.nothing('invalid value')
elsif do_offer(item)
Concurrent::Maybe.just(true)
else
Concurrent::Maybe.nothing
end
end
|
#poll ⇒ Object
174
175
176
|
# File 'lib/concurrent/channel.rb', line 174
def poll
(item = do_poll) == Concurrent::NULL ? nil : item
end
|
#poll! ⇒ Object
178
179
180
181
182
|
# File 'lib/concurrent/channel.rb', line 178
def poll!
item = do_poll
raise Error if item == Concurrent::NULL
item
end
|
#poll? ⇒ Boolean
184
185
186
187
188
189
190
|
# File 'lib/concurrent/channel.rb', line 184
def poll?
if (item = do_poll) == Concurrent::NULL
Concurrent::Maybe.nothing
else
Concurrent::Maybe.just(item)
end
end
|
#put(item) ⇒ Object
Also known as:
send, <<
74
75
76
77
|
# File 'lib/concurrent/channel.rb', line 74
def put(item)
return false unless validate(item, false, false)
do_put(item)
end
|
#put!(item) ⇒ Object
81
82
83
84
85
86
|
# File 'lib/concurrent/channel.rb', line 81
def put!(item)
validate(item, false, true)
ok = do_put(item)
raise Error if !ok
ok
end
|
#put?(item) ⇒ Boolean
88
89
90
91
92
93
94
95
96
|
# File 'lib/concurrent/channel.rb', line 88
def put?(item)
if !validate(item, true, false)
Concurrent::Maybe.nothing('invalid value')
elsif do_put(item)
Concurrent::Maybe.just(true)
else
Concurrent::Maybe.nothing
end
end
|
#take ⇒ Object
Also known as:
receive, ~
120
121
122
123
|
# File 'lib/concurrent/channel.rb', line 120
def take
item = do_take
item == Concurrent::NULL ? nil : item
end
|
#take! ⇒ Object
127
128
129
130
131
|
# File 'lib/concurrent/channel.rb', line 127
def take!
item = do_take
raise Error if item == Concurrent::NULL
item
end
|
#take? ⇒ Boolean
133
134
135
136
137
138
139
140
141
|
# File 'lib/concurrent/channel.rb', line 133
def take?
item = do_take
item = if item == Concurrent::NULL
Concurrent::Maybe.nothing
else
Concurrent::Maybe.just(item)
end
item
end
|