Class: EventMachine::Reactor

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/pr_eventmachine.rb

Constant Summary collapse

HeartbeatInterval =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



273
274
275
# File 'lib/pr_eventmachine.rb', line 273

def initialize
  initialize_for_run
end

Instance Attribute Details

#current_loop_timeObject (readonly)

Returns the value of attribute current_loop_time.



271
272
273
# File 'lib/pr_eventmachine.rb', line 271

def current_loop_time
  @current_loop_time
end

Instance Method Details

#add_selectable(io) ⇒ Object



299
300
301
# File 'lib/pr_eventmachine.rb', line 299

def add_selectable io
  @selectables[io.uuid] = io
end

#close_loopbreakerObject



402
403
404
405
# File 'lib/pr_eventmachine.rb', line 402

def close_loopbreaker
  @loopbreak_writer.close
  @loopbreak_writer = nil
end

#crank_selectablesObject



356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/pr_eventmachine.rb', line 356

def crank_selectables
    #$stderr.write 'R'

    readers = @selectables.values.select {|io| io.select_for_reading?}
    writers = @selectables.values.select {|io| io.select_for_writing?}

    s = select( readers, writers, nil, @timer_quantum)

    s and s[1] and s[1].each {|w| w.eventable_write }
    s and s[0] and s[0].each {|r| r.eventable_read }

    @selectables.delete_if {|k,io|
      if io.close_scheduled?
        io.close
        true
      end
    }
end

#get_selectable(uuid) ⇒ Object



303
304
305
# File 'lib/pr_eventmachine.rb', line 303

def get_selectable uuid
  @selectables[uuid]
end

#initialize_for_runObject

Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.



289
290
291
292
293
294
295
296
297
# File 'lib/pr_eventmachine.rb', line 289

def initialize_for_run
  @running = false
  @stop_scheduled = false
  @selectables ||= {}; @selectables.clear
  @timers = SortedSet.new # []
  set_timer_quantum(0.1)
  @current_loop_time = Time.now
  @next_heartbeat = @current_loop_time + HeartbeatInterval
end

#install_oneshot_timer(interval) ⇒ Object

– Replaced original implementation 05Dec07, was way too slow because of the sort.



279
280
281
282
283
284
285
# File 'lib/pr_eventmachine.rb', line 279

def install_oneshot_timer interval
  uuid = UuidGenerator::generate
  #@timers << [Time.now + interval, uuid]
  #@timers.sort! {|a,b| a.first <=> b.first}
  @timers.add([Time.now + interval, uuid])
  uuid
end

#open_loopbreakerObject



381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/pr_eventmachine.rb', line 381

def open_loopbreaker
 # Can't use an IO.pipe because they can't be set nonselectable in Windows.
 # Pick a random localhost UDP port.
  #@loopbreak_writer.close if @loopbreak_writer
  #rd,@loopbreak_writer = IO.pipe
	@loopbreak_reader = UDPSocket.new
	@loopbreak_writer = UDPSocket.new
	bound = false
	100.times {
@loopbreak_port = rand(10000) + 40000
begin
	@loopbreak_reader.bind "localhost", @loopbreak_port
	bound = true
	break
rescue
end
	}
	raise "Unable to bind Loopbreaker" unless bound
	LoopbreakReader.new(@loopbreak_reader)
end

#runObject

Raises:



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/pr_eventmachine.rb', line 307

def run
  raise Error.new( "already running" ) if @running
  @running = true

  begin
    open_loopbreaker

    loop {
	@current_loop_time = Time.now

      break if @stop_scheduled
      run_timers
      break if @stop_scheduled
      crank_selectables
      break if @stop_scheduled
      run_heartbeats
    }
  ensure
    close_loopbreaker
    @selectables.each {|k, io| io.close}
    @selectables.clear

    @running = false
  end

end

#run_heartbeatsObject



349
350
351
352
353
354
# File 'lib/pr_eventmachine.rb', line 349

def run_heartbeats
  if @next_heartbeat <= @current_loop_time
    @next_heartbeat = @current_loop_time + HeartbeatInterval
    @selectables.each {|k,io| io.heartbeat}
  end
end

#run_timersObject



334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/pr_eventmachine.rb', line 334

def run_timers
  @timers.each {|t|
    if t.first <= @current_loop_time
      @timers.delete t
	EventMachine::event_callback "", TimerFired, t.last
    else
      break
    end
  }
  #while @timers.length > 0 and @timers.first.first <= now
  #  t = @timers.shift
  #  EventMachine::event_callback "", TimerFired, t.last
  #end
end

#set_timer_quantum(interval_in_seconds) ⇒ Object



412
413
414
# File 'lib/pr_eventmachine.rb', line 412

def set_timer_quantum interval_in_seconds
  @timer_quantum = interval_in_seconds
end

#signal_loopbreakObject



407
408
409
410
# File 'lib/pr_eventmachine.rb', line 407

def signal_loopbreak
  #@loopbreak_writer.write '+' if @loopbreak_writer
	@loopbreak_writer.send('+',0,"localhost",@loopbreak_port) if @loopbreak_writer
end

#stopObject

#stop

Raises:



376
377
378
379
# File 'lib/pr_eventmachine.rb', line 376

def stop
  raise Error.new( "not running") unless @running
  @stop_scheduled = true
end