Class: EventMachine::Reactor

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/pr_eventmachine.rb

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



199
200
201
# File 'lib/pr_eventmachine.rb', line 199

def initialize
  initialize_for_run
end

Instance Method Details

#add_selectable(io) ⇒ Object



220
221
222
# File 'lib/pr_eventmachine.rb', line 220

def add_selectable io
  @selectables[io.uuid] = io
end

#close_loopbreakerObject



286
287
288
289
# File 'lib/pr_eventmachine.rb', line 286

def close_loopbreaker
  @loopbreak_writer.close
  @loopbreak_writer = nil
end

#crank_selectablesObject



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/pr_eventmachine.rb', line 255

def crank_selectables
    #$stderr.write 'R'

    readers = @selectables.values.select {|io| io.select_for_reading?}
    writers = @selectables.values.select {|io| io.select_for_writing?}

    s = select( readers, writers, nil, @timer_quantum)

    s and s[1] and s[1].each {|w| w.eventable_write }
    s and s[0] and s[0].each {|r| r.eventable_read }

    @selectables.delete_if {|k,io|
      if io.close_scheduled?
        io.close
        true
      end
    }
end

#get_selectable(uuid) ⇒ Object



224
225
226
# File 'lib/pr_eventmachine.rb', line 224

def get_selectable uuid
  @selectables[uuid]
end

#initialize_for_runObject

Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.



212
213
214
215
216
217
218
# File 'lib/pr_eventmachine.rb', line 212

def initialize_for_run
  @running = false
  @stop_scheduled = false
  @selectables ||= {}; @selectables.clear
  @timers = []
  set_timer_quantum(0.5)
end

#install_oneshot_timer(interval) ⇒ Object



203
204
205
206
207
208
# File 'lib/pr_eventmachine.rb', line 203

def install_oneshot_timer interval
  uuid = UuidGenerator::generate
  @timers << [Time.now + interval, uuid]
  @timers.sort! {|a,b| a.first <=> b.first}
  uuid
end

#open_loopbreakerObject



280
281
282
283
284
# File 'lib/pr_eventmachine.rb', line 280

def open_loopbreaker
  @loopbreak_writer.close if @loopbreak_writer
  rd,@loopbreak_writer = IO.pipe
  LoopbreakReader.new rd
end

#runObject

Raises:



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/pr_eventmachine.rb', line 228

def run
  raise Error.new( "already running" ) if @running
  @running = true
  open_loopbreaker

  loop {
    break if @stop_scheduled
    run_timers
    break if @stop_scheduled
    crank_selectables
  }

  close_loopbreaker
  @selectables.each {|k, io| io.close}
  @selectables.clear

  @running = false
end

#run_timersObject



247
248
249
250
251
252
253
# File 'lib/pr_eventmachine.rb', line 247

def run_timers
  now = Time.now
  while @timers.length > 0 and @timers.first.first <= now
    t = @timers.shift
    EventMachine::event_callback "", TimerFired, t.last
  end
end

#set_timer_quantum(interval_in_seconds) ⇒ Object



295
296
297
# File 'lib/pr_eventmachine.rb', line 295

def set_timer_quantum interval_in_seconds
  @timer_quantum = interval_in_seconds
end

#signal_loopbreakObject



291
292
293
# File 'lib/pr_eventmachine.rb', line 291

def signal_loopbreak
  @loopbreak_writer.write '+' if @loopbreak_writer
end

#stopObject

#stop

Raises:



275
276
277
278
# File 'lib/pr_eventmachine.rb', line 275

def stop
  raise Error.new( "not running") unless @running
  @stop_scheduled = true
end