Class: Concurrent::Channel

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable
Defined in:
lib/concurrent/channel.rb,
lib/concurrent/channel/tick.rb,
lib/concurrent/channel/selector.rb,
lib/concurrent/channel/buffer/base.rb,
lib/concurrent/channel/buffer/timer.rb,
lib/concurrent/channel/buffer/ticker.rb,
lib/concurrent/channel/buffer/sliding.rb,
lib/concurrent/channel/buffer/buffered.rb,
lib/concurrent/channel/buffer/dropping.rb,
lib/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent/channel/selector/put_clause.rb,
lib/concurrent/channel/selector/take_clause.rb,
lib/concurrent/channel/selector/after_clause.rb,
lib/concurrent/channel/selector/error_clause.rb,
lib/concurrent/channel/selector/default_clause.rb

Overview

Defined Under Namespace

Modules: Buffer Classes: Tick, ValidationError

Constant Summary collapse

Error =
Class.new(StandardError)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Channel

Returns a new instance of Channel.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/concurrent/channel.rb', line 46

def initialize(opts = {})
  # undocumented -- for internal use only
  if opts.is_a? Buffer::Base
    self.buffer = opts
    return
  end

  capacity = opts[:capacity] || opts[:size]
  buffer = opts[:buffer]

  if capacity && buffer == :unbuffered
    raise ArgumentError.new('unbuffered channels cannot have a capacity')
  elsif capacity.nil? && buffer.nil?
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity == 0 && buffer == :buffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif buffer == :unbuffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity.nil? || capacity < 1
    raise ArgumentError.new('capacity must be at least 1 for this buffer type')
  else
    buffer ||= :buffered
    self.buffer = BUFFER_TYPES[buffer].new(capacity)
  end

  self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
end

Class Method Details

.go(*args, &block) ⇒ Object



223
224
225
# File 'lib/concurrent/channel.rb', line 223

def go(*args, &block)
  go_via(GOROUTINES, *args, &block)
end

.go_loop(*args, &block) ⇒ Object



232
233
234
# File 'lib/concurrent/channel.rb', line 232

def go_loop(*args, &block)
  go_loop_via(GOROUTINES, *args, &block)
end

.go_loop_via(executor, *args, &block) ⇒ Object

Raises:

  • (ArgumentError)


236
237
238
239
240
241
242
243
# File 'lib/concurrent/channel.rb', line 236

def go_loop_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(block, *args) do
    loop do
      break unless block.call(*args)
    end
  end
end

.go_via(executor, *args, &block) ⇒ Object

Raises:

  • (ArgumentError)


227
228
229
230
# File 'lib/concurrent/channel.rb', line 227

def go_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(*args, &block)
end

.select(*args) {|selector, args| ... } ⇒ Object Also known as: alt

Yields:

  • (selector, args)

Raises:

  • (ArgumentError)


215
216
217
218
219
220
# File 'lib/concurrent/channel.rb', line 215

def select(*args)
  raise ArgumentError.new('no block given') unless block_given?
  selector = Selector.new
  yield(selector, *args)
  selector.execute
end

.ticker(interval) ⇒ Object Also known as: tick



210
211
212
# File 'lib/concurrent/channel.rb', line 210

def ticker(interval)
  Channel.new(Buffer::Ticker.new(interval))
end

.timer(seconds) ⇒ Object Also known as: after



205
206
207
# File 'lib/concurrent/channel.rb', line 205

def timer(seconds)
  Channel.new(Buffer::Timer.new(seconds))
end

Instance Method Details

#eachObject

Raises:

  • (ArgumentError)


192
193
194
195
196
197
198
199
200
201
202
# File 'lib/concurrent/channel.rb', line 192

def each
  raise ArgumentError.new('no block given') unless block_given?
  loop do
    item, more = do_next
    if item != Concurrent::NULL
      yield(item)
    elsif !more
      break
    end
  end
end

#nextObject

Examples:


jobs = Channel.new

Channel.go do
  loop do
    j, more = jobs.next
    if more
      print "received job #{j}\n"
    else
      print "received all jobs\n"
      break
    end
  end
end


158
159
160
161
162
# File 'lib/concurrent/channel.rb', line 158

def next
  item, more = do_next
  item = nil if item == Concurrent::NULL
  return item, more
end

#next?Boolean

Returns:

  • (Boolean)


164
165
166
167
168
169
170
171
172
# File 'lib/concurrent/channel.rb', line 164

def next?
  item, more = do_next
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  return item, more
end

#offer(item) ⇒ Object



98
99
100
101
# File 'lib/concurrent/channel.rb', line 98

def offer(item)
  return false unless validate(item, false, false)
  do_offer(item)
end

#offer!(item) ⇒ Object

Raises:



103
104
105
106
107
108
# File 'lib/concurrent/channel.rb', line 103

def offer!(item)
  validate(item, false, true)
  ok = do_offer(item)
  raise Error if !ok
  ok
end

#offer?(item) ⇒ Boolean

Returns:

  • (Boolean)


110
111
112
113
114
115
116
117
118
# File 'lib/concurrent/channel.rb', line 110

def offer?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_offer(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end

#pollObject



174
175
176
# File 'lib/concurrent/channel.rb', line 174

def poll
  (item = do_poll) == Concurrent::NULL ? nil : item
end

#poll!Object

Raises:



178
179
180
181
182
# File 'lib/concurrent/channel.rb', line 178

def poll!
  item = do_poll
  raise Error if item == Concurrent::NULL
  item
end

#poll?Boolean

Returns:

  • (Boolean)


184
185
186
187
188
189
190
# File 'lib/concurrent/channel.rb', line 184

def poll?
  if (item = do_poll) == Concurrent::NULL
    Concurrent::Maybe.nothing
  else
    Concurrent::Maybe.just(item)
  end
end

#put(item) ⇒ Object Also known as: send, <<



74
75
76
77
# File 'lib/concurrent/channel.rb', line 74

def put(item)
  return false unless validate(item, false, false)
  do_put(item)
end

#put!(item) ⇒ Object

Raises:



81
82
83
84
85
86
# File 'lib/concurrent/channel.rb', line 81

def put!(item)
  validate(item, false, true)
  ok = do_put(item)
  raise Error if !ok
  ok
end

#put?(item) ⇒ Boolean

Returns:

  • (Boolean)


88
89
90
91
92
93
94
95
96
# File 'lib/concurrent/channel.rb', line 88

def put?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_put(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end

#takeObject Also known as: receive, ~



120
121
122
123
# File 'lib/concurrent/channel.rb', line 120

def take
  item = do_take
  item == Concurrent::NULL ? nil : item
end

#take!Object

Raises:



127
128
129
130
131
# File 'lib/concurrent/channel.rb', line 127

def take!
  item = do_take
  raise Error if item == Concurrent::NULL
  item
end

#take?Boolean

Returns:

  • (Boolean)


133
134
135
136
137
138
139
140
141
# File 'lib/concurrent/channel.rb', line 133

def take?
  item = do_take
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  item
end