Class: Concurrent::Channel

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable
Defined in:
lib/concurrent/channel.rb,
lib/concurrent/channel/tick.rb,
lib/concurrent/channel/selector.rb,
lib/concurrent/channel/buffer/base.rb,
lib/concurrent/channel/buffer/timer.rb,
lib/concurrent/channel/buffer/ticker.rb,
lib/concurrent/channel/buffer/sliding.rb,
lib/concurrent/channel/buffer/buffered.rb,
lib/concurrent/channel/buffer/dropping.rb,
lib/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent/channel/selector/put_clause.rb,
lib/concurrent/channel/selector/take_clause.rb,
lib/concurrent/channel/selector/after_clause.rb,
lib/concurrent/channel/selector/error_clause.rb,
lib/concurrent/channel/selector/default_clause.rb

Overview

Defined Under Namespace

Modules: Buffer Classes: Tick, ValidationError

Constant Summary collapse

Error =
Class.new(StandardError)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Channel



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/concurrent/channel.rb', line 46

def initialize(opts = {})
  # undocumented -- for internal use only
  if opts.is_a? Buffer::Base
    self.buffer = opts
    return
  end

  capacity = opts[:capacity] || opts[:size]
  buffer = opts[:buffer]

  if capacity && buffer == :unbuffered
    raise ArgumentError.new('unbuffered channels cannot have a capacity')
  elsif capacity.nil? && buffer.nil?
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity == 0 && buffer == :buffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif buffer == :unbuffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity.nil? || capacity < 1
    raise ArgumentError.new('capacity must be at least 1 for this buffer type')
  else
    buffer ||= :buffered
    self.buffer = BUFFER_TYPES[buffer].new(capacity)
  end

  self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
end

Class Method Details

.go(*args, &block) ⇒ Object



223
224
225
# File 'lib/concurrent/channel.rb', line 223

def go(*args, &block)
  go_via(GOROUTINES, *args, &block)
end

.go_loop(*args, &block) ⇒ Object



232
233
234
# File 'lib/concurrent/channel.rb', line 232

def go_loop(*args, &block)
  go_loop_via(GOROUTINES, *args, &block)
end

.go_loop_via(executor, *args, &block) ⇒ Object

Raises:

  • (ArgumentError)


236
237
238
239
240
241
242
243
# File 'lib/concurrent/channel.rb', line 236

def go_loop_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(block, *args) do
    loop do
      break unless block.call(*args)
    end
  end
end

.go_via(executor, *args, &block) ⇒ Object

Raises:

  • (ArgumentError)


227
228
229
230
# File 'lib/concurrent/channel.rb', line 227

def go_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(*args, &block)
end

.select(*args) {|selector, args| ... } ⇒ Object Also known as: alt

Yields:

  • (selector, args)

Raises:

  • (ArgumentError)


215
216
217
218
219
220
# File 'lib/concurrent/channel.rb', line 215

def select(*args)
  raise ArgumentError.new('no block given') unless block_given?
  selector = Selector.new
  yield(selector, *args)
  selector.execute
end

.ticker(interval) ⇒ Object Also known as: tick



210
211
212
# File 'lib/concurrent/channel.rb', line 210

def ticker(interval)
  Channel.new(Buffer::Ticker.new(interval))
end

.timer(seconds) ⇒ Object Also known as: after



205
206
207
# File 'lib/concurrent/channel.rb', line 205

def timer(seconds)
  Channel.new(Buffer::Timer.new(seconds))
end

Instance Method Details

#eachObject

Raises:

  • (ArgumentError)


192
193
194
195
196
197
198
199
200
201
202
# File 'lib/concurrent/channel.rb', line 192

def each
  raise ArgumentError.new('no block given') unless block_given?
  loop do
    item, more = do_next
    if item != Concurrent::NULL
      yield(item)
    elsif !more
      break
    end
  end
end

#nextObject

Examples:


jobs = Channel.new

Channel.go do
  loop do
    j, more = jobs.next
    if more
      print "received job #{j}\n"
    else
      print "received all jobs\n"
      break
    end
  end
end


158
159
160
161
162
# File 'lib/concurrent/channel.rb', line 158

def next
  item, more = do_next
  item = nil if item == Concurrent::NULL
  return item, more
end

#next?Boolean



164
165
166
167
168
169
170
171
172
# File 'lib/concurrent/channel.rb', line 164

def next?
  item, more = do_next
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  return item, more
end

#offer(item) ⇒ Object



98
99
100
101
# File 'lib/concurrent/channel.rb', line 98

def offer(item)
  return false unless validate(item, false, false)
  do_offer(item)
end

#offer!(item) ⇒ Object

Raises:



103
104
105
106
107
108
# File 'lib/concurrent/channel.rb', line 103

def offer!(item)
  validate(item, false, true)
  ok = do_offer(item)
  raise Error if !ok
  ok
end

#offer?(item) ⇒ Boolean



110
111
112
113
114
115
116
117
118
# File 'lib/concurrent/channel.rb', line 110

def offer?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_offer(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end

#pollObject



174
175
176
# File 'lib/concurrent/channel.rb', line 174

def poll
  (item = do_poll) == Concurrent::NULL ? nil : item
end

#poll!Object

Raises:



178
179
180
181
182
# File 'lib/concurrent/channel.rb', line 178

def poll!
  item = do_poll
  raise Error if item == Concurrent::NULL
  item
end

#poll?Boolean



184
185
186
187
188
189
190
# File 'lib/concurrent/channel.rb', line 184

def poll?
  if (item = do_poll) == Concurrent::NULL
    Concurrent::Maybe.nothing
  else
    Concurrent::Maybe.just(item)
  end
end

#put(item) ⇒ Object Also known as: send, <<



74
75
76
77
# File 'lib/concurrent/channel.rb', line 74

def put(item)
  return false unless validate(item, false, false)
  do_put(item)
end

#put!(item) ⇒ Object

Raises:



81
82
83
84
85
86
# File 'lib/concurrent/channel.rb', line 81

def put!(item)
  validate(item, false, true)
  ok = do_put(item)
  raise Error if !ok
  ok
end

#put?(item) ⇒ Boolean



88
89
90
91
92
93
94
95
96
# File 'lib/concurrent/channel.rb', line 88

def put?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_put(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end

#takeObject Also known as: receive, ~



120
121
122
123
# File 'lib/concurrent/channel.rb', line 120

def take
  item = do_take
  item == Concurrent::NULL ? nil : item
end

#take!Object

Raises:



127
128
129
130
131
# File 'lib/concurrent/channel.rb', line 127

def take!
  item = do_take
  raise Error if item == Concurrent::NULL
  item
end

#take?Boolean



133
134
135
136
137
138
139
140
141
# File 'lib/concurrent/channel.rb', line 133

def take?
  item = do_take
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  item
end