Class: Pwrake::NBIO::MultiReader

Inherits:
Reader
  • Object
show all
Defined in:
lib/pwrake/nbio.rb

Overview


Instance Attribute Summary collapse

Attributes inherited from Reader

#check_timeout, #io, #waiter

Instance Method Summary collapse

Methods inherited from Reader

#_read, #eof?, #read, #read_line_nonblock, #read_until, #readln, #select_io

Constructor Details

#initialize(selector, io, n_chan = 0) ⇒ MultiReader

Returns a new instance of MultiReader.



348
349
350
351
352
353
354
# File 'lib/pwrake/nbio.rb', line 348

def initialize(selector, io, n_chan=0)
  super(selector, io)
  @n_chan = n_chan
  @queue = @n_chan.times.map{|i| FiberReaderQueue.new(self)}
  @default_queue = FiberReaderQueue.new(self)
  @check_timeout = true
end

Instance Attribute Details

#default_queueObject

Returns the value of attribute default_queue.



356
357
358
# File 'lib/pwrake/nbio.rb', line 356

def default_queue
  @default_queue
end

#queueObject (readonly)

Returns the value of attribute queue.



355
356
357
# File 'lib/pwrake/nbio.rb', line 355

def queue
  @queue
end

Instance Method Details

#[](ch) ⇒ Object



358
359
360
# File 'lib/pwrake/nbio.rb', line 358

def [](ch)
  @queue[ch]
end

#callObject



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
# File 'lib/pwrake/nbio.rb', line 378

def call
  while line = read_line_nonblock
    if /^(\d+):(.*)$/ =~ line
      ch,str = $1,$2
      if q = @queue[ch.to_i]
        q.enq(str)
      else
        raise "No queue ##{ch}, received: #{line}"
      end
    elsif @default_queue
      @default_queue.enq(line)
    else
      raise "No default_queue, received: #{line}"
    end
  end
rescue EOFError
  halt
rescue IO::WaitReadable
end

#error(e) ⇒ Object



398
399
400
401
402
# File 'lib/pwrake/nbio.rb', line 398

def error(e)
  @closed = true
  @queue.each{|q| q.enq(e)}
  @default_queue.enq(e)
end

#get_line(ch = nil) ⇒ Object

call from Fiber context



370
371
372
373
374
375
376
# File 'lib/pwrake/nbio.rb', line 370

def get_line(ch=nil)
  if ch && !@queue.empty?
    @queue[ch].deq
  else
    @default_queue.deq
  end
end

#haltObject



404
405
406
407
408
# File 'lib/pwrake/nbio.rb', line 404

def halt
  Log.debug("Handler.halt") if defined? Log
  @queue.each{|q| q.halt}
  @default_queue.halt
end

#new_queueObject



362
363
364
365
366
367
# File 'lib/pwrake/nbio.rb', line 362

def new_queue
  n = @n_chan
  @queue << q = FiberReaderQueue.new(self)
  @n_chan += 1
  [n,q]
end