Class: Pwrake::NBIO::MultiReader
Overview
Instance Attribute Summary collapse
-
#default_queue ⇒ Object
Returns the value of attribute default_queue.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Attributes inherited from Reader
Instance Method Summary collapse
- #[](ch) ⇒ Object
- #call ⇒ Object
- #error(e) ⇒ Object
-
#get_line(ch = nil) ⇒ Object
call from Fiber context.
- #halt ⇒ Object
-
#initialize(selector, io, n_chan = 0) ⇒ MultiReader
constructor
A new instance of MultiReader.
- #new_queue ⇒ Object
Methods inherited from Reader
#_read, #eof?, #read, #read_line_nonblock, #read_until, #readln, #select_io
Constructor Details
#initialize(selector, io, n_chan = 0) ⇒ MultiReader
Returns a new instance of MultiReader.
348 349 350 351 352 353 354 |
# File 'lib/pwrake/nbio.rb', line 348 def initialize(selector, io, n_chan=0) super(selector, io) @n_chan = n_chan @queue = @n_chan.times.map{|i| FiberReaderQueue.new(self)} @default_queue = FiberReaderQueue.new(self) @check_timeout = true end |
Instance Attribute Details
#default_queue ⇒ Object
Returns the value of attribute default_queue.
356 357 358 |
# File 'lib/pwrake/nbio.rb', line 356 def default_queue @default_queue end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
355 356 357 |
# File 'lib/pwrake/nbio.rb', line 355 def queue @queue end |
Instance Method Details
#[](ch) ⇒ Object
358 359 360 |
# File 'lib/pwrake/nbio.rb', line 358 def [](ch) @queue[ch] end |
#call ⇒ Object
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 |
# File 'lib/pwrake/nbio.rb', line 378 def call while line = read_line_nonblock if /^(\d+):(.*)$/ =~ line ch,str = $1,$2 if q = @queue[ch.to_i] q.enq(str) else raise "No queue ##{ch}, received: #{line}" end elsif @default_queue @default_queue.enq(line) else raise "No default_queue, received: #{line}" end end rescue EOFError halt rescue IO::WaitReadable end |
#error(e) ⇒ Object
398 399 400 401 402 |
# File 'lib/pwrake/nbio.rb', line 398 def error(e) @closed = true @queue.each{|q| q.enq(e)} @default_queue.enq(e) end |
#get_line(ch = nil) ⇒ Object
call from Fiber context
370 371 372 373 374 375 376 |
# File 'lib/pwrake/nbio.rb', line 370 def get_line(ch=nil) if ch && !@queue.empty? @queue[ch].deq else @default_queue.deq end end |
#halt ⇒ Object
404 405 406 407 408 |
# File 'lib/pwrake/nbio.rb', line 404 def halt Log.debug("Handler.halt") if defined? Log @queue.each{|q| q.halt} @default_queue.halt end |
#new_queue ⇒ Object
362 363 364 365 366 367 |
# File 'lib/pwrake/nbio.rb', line 362 def new_queue n = @n_chan @queue << q = FiberReaderQueue.new(self) @n_chan += 1 [n,q] end |