Module: Rbgo::Channel

Included in:
NetworkServiceFactory
Defined in:
lib/rbgo/select_chan.rb

Defined Under Namespace

Modules: Chan Classes: BufferChan, NonBufferChan

Class Method Summary collapse

Class Method Details

.on_read(chan:, &blk) ⇒ Object

on_read

Raises:

  • (ArgumentError)


317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/rbgo/select_chan.rb', line 317

def on_read(chan:, &blk)
  raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan
  op = Proc.new do
    res, ok = chan.deq(true)
    if blk.nil?
      [res, ok]
    else
      blk.call(res, ok)
    end
  end
  op.define_singleton_method(:register) do |io_w|
    chan.send :register, io_w
  end
  op
end

.on_write(chan:, obj:, &blk) ⇒ Object

on_write

Raises:

  • (ArgumentError)


339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/rbgo/select_chan.rb', line 339

def on_write(chan:, obj:, &blk)
  raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan
  op = Proc.new do
    res = chan.enq(obj, true)
    res = blk.call unless blk.nil?
    res
  end
  op.define_singleton_method(:register) do |io_w|
    chan.send :register, io_w
  end
  op
end

.select_chan(*ops) ⇒ Object

select_chan



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/rbgo/select_chan.rb', line 264

def select_chan(*ops)
  ops.shuffle!

  io_hash      = {}
  close_io_blk = proc do
    io_hash.each_pair.flat_map do |k, v|
      [k, v[1]]
    end.each do |io|
      io.close rescue nil
    end
  end

  begin
    while true do

      close_io_blk.call
      io_hash.clear

      ops.each do |op|
        io_r, io_w = IO.pipe
        op.register(io_w)
        io_hash[io_r] = [op, io_w]
      end

      ops.each do |op|
        begin
          return op.call
        rescue ThreadError
        end
      end

      return yield if block_given?

      read_ios = IO.select(io_hash.keys).first rescue []

      read_ios.each do |io_r|
        op = io_hash[io_r].first
        begin
          return op.call
        rescue ThreadError
        end
      end
    end
  ensure
    close_io_blk.call
  end
end