Class: Concurrent::Channel
- Inherits:
-
Object
- Object
- Concurrent::Channel
show all
- Extended by:
- Forwardable
- Includes:
- Enumerable
- Defined in:
- lib/concurrent/channel.rb,
lib/concurrent/channel/tick.rb,
lib/concurrent/channel/selector.rb,
lib/concurrent/channel/buffer/base.rb,
lib/concurrent/channel/buffer/timer.rb,
lib/concurrent/channel/buffer/ticker.rb,
lib/concurrent/channel/buffer/sliding.rb,
lib/concurrent/channel/buffer/buffered.rb,
lib/concurrent/channel/buffer/dropping.rb,
lib/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent/channel/selector/put_clause.rb,
lib/concurrent/channel/selector/take_clause.rb,
lib/concurrent/channel/selector/after_clause.rb,
lib/concurrent/channel/selector/error_clause.rb,
lib/concurrent/channel/selector/default_clause.rb
Overview
Defined Under Namespace
Modules: Buffer
Classes: Tick, ValidationError
Constant Summary
collapse
- Error =
Class.new(StandardError)
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(opts = {}) ⇒ Channel
Returns a new instance of Channel.
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/concurrent/channel.rb', line 46
def initialize(opts = {})
if opts.is_a? Buffer::Base
self.buffer = opts
return
end
capacity = opts[:capacity] || opts[:size]
buffer = opts[:buffer]
if capacity && buffer == :unbuffered
raise ArgumentError.new('unbuffered channels cannot have a capacity')
elsif capacity.nil? && buffer.nil?
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif capacity == 0 && buffer == :buffered
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif buffer == :unbuffered
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif capacity.nil? || capacity < 1
raise ArgumentError.new('capacity must be at least 1 for this buffer type')
else
buffer ||= :buffered
self.buffer = BUFFER_TYPES[buffer].new(capacity)
end
self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
end
|
Class Method Details
.go(*args, &block) ⇒ Object
223
224
225
|
# File 'lib/concurrent/channel.rb', line 223
def go(*args, &block)
go_via(GOROUTINES, *args, &block)
end
|
.go_loop(*args, &block) ⇒ Object
232
233
234
|
# File 'lib/concurrent/channel.rb', line 232
def go_loop(*args, &block)
go_loop_via(GOROUTINES, *args, &block)
end
|
.go_loop_via(executor, *args, &block) ⇒ Object
236
237
238
239
240
241
242
243
|
# File 'lib/concurrent/channel.rb', line 236
def go_loop_via(executor, *args, &block)
raise ArgumentError.new('no block given') unless block_given?
executor.post(block, *args) do
loop do
break unless block.call(*args)
end
end
end
|
.go_via(executor, *args, &block) ⇒ Object
227
228
229
230
|
# File 'lib/concurrent/channel.rb', line 227
def go_via(executor, *args, &block)
raise ArgumentError.new('no block given') unless block_given?
executor.post(*args, &block)
end
|
.select(*args) {|selector, args| ... } ⇒ Object
Also known as:
alt
215
216
217
218
219
220
|
# File 'lib/concurrent/channel.rb', line 215
def select(*args)
raise ArgumentError.new('no block given') unless block_given?
selector = Selector.new
yield(selector, *args)
selector.execute
end
|
.ticker(interval) ⇒ Object
Also known as:
tick
210
211
212
|
# File 'lib/concurrent/channel.rb', line 210
def ticker(interval)
Channel.new(Buffer::Ticker.new(interval))
end
|
.timer(seconds) ⇒ Object
Also known as:
after
205
206
207
|
# File 'lib/concurrent/channel.rb', line 205
def timer(seconds)
Channel.new(Buffer::Timer.new(seconds))
end
|
Instance Method Details
#each ⇒ Object
192
193
194
195
196
197
198
199
200
201
202
|
# File 'lib/concurrent/channel.rb', line 192
def each
raise ArgumentError.new('no block given') unless block_given?
loop do
item, more = do_next
if item != Concurrent::NULL
yield(item)
elsif !more
break
end
end
end
|
#next ⇒ Object
158
159
160
161
162
|
# File 'lib/concurrent/channel.rb', line 158
def next
item, more = do_next
item = nil if item == Concurrent::NULL
return item, more
end
|
#next? ⇒ Boolean
164
165
166
167
168
169
170
171
172
|
# File 'lib/concurrent/channel.rb', line 164
def next?
item, more = do_next
item = if item == Concurrent::NULL
Concurrent::Maybe.nothing
else
Concurrent::Maybe.just(item)
end
return item, more
end
|
#offer(item) ⇒ Object
98
99
100
101
|
# File 'lib/concurrent/channel.rb', line 98
def offer(item)
return false unless validate(item, false, false)
do_offer(item)
end
|
#offer!(item) ⇒ Object
103
104
105
106
107
108
|
# File 'lib/concurrent/channel.rb', line 103
def offer!(item)
validate(item, false, true)
ok = do_offer(item)
raise Error if !ok
ok
end
|
#offer?(item) ⇒ Boolean
110
111
112
113
114
115
116
117
118
|
# File 'lib/concurrent/channel.rb', line 110
def offer?(item)
if !validate(item, true, false)
Concurrent::Maybe.nothing('invalid value')
elsif do_offer(item)
Concurrent::Maybe.just(true)
else
Concurrent::Maybe.nothing
end
end
|
#poll ⇒ Object
174
175
176
|
# File 'lib/concurrent/channel.rb', line 174
def poll
(item = do_poll) == Concurrent::NULL ? nil : item
end
|
#poll! ⇒ Object
178
179
180
181
182
|
# File 'lib/concurrent/channel.rb', line 178
def poll!
item = do_poll
raise Error if item == Concurrent::NULL
item
end
|
#poll? ⇒ Boolean
184
185
186
187
188
189
190
|
# File 'lib/concurrent/channel.rb', line 184
def poll?
if (item = do_poll) == Concurrent::NULL
Concurrent::Maybe.nothing
else
Concurrent::Maybe.just(item)
end
end
|
#put(item) ⇒ Object
Also known as:
send, <<
74
75
76
77
|
# File 'lib/concurrent/channel.rb', line 74
def put(item)
return false unless validate(item, false, false)
do_put(item)
end
|
#put!(item) ⇒ Object
81
82
83
84
85
86
|
# File 'lib/concurrent/channel.rb', line 81
def put!(item)
validate(item, false, true)
ok = do_put(item)
raise Error if !ok
ok
end
|
#put?(item) ⇒ Boolean
88
89
90
91
92
93
94
95
96
|
# File 'lib/concurrent/channel.rb', line 88
def put?(item)
if !validate(item, true, false)
Concurrent::Maybe.nothing('invalid value')
elsif do_put(item)
Concurrent::Maybe.just(true)
else
Concurrent::Maybe.nothing
end
end
|
#take ⇒ Object
Also known as:
receive, ~
120
121
122
123
|
# File 'lib/concurrent/channel.rb', line 120
def take
item = do_take
item == Concurrent::NULL ? nil : item
end
|
#take! ⇒ Object
127
128
129
130
131
|
# File 'lib/concurrent/channel.rb', line 127
def take!
item = do_take
raise Error if item == Concurrent::NULL
item
end
|
#take? ⇒ Boolean
133
134
135
136
137
138
139
140
141
|
# File 'lib/concurrent/channel.rb', line 133
def take?
item = do_take
item = if item == Concurrent::NULL
Concurrent::Maybe.nothing
else
Concurrent::Maybe.just(item)
end
item
end
|