Class: Utilrb::EventLoop

Inherits:
Object show all
Defined in:
lib/utilrb/event_loop.rb

Overview

Simple event loop which supports timers and defers blocking operations to a thread pool those results are queued and being processed by the event loop thread at the end of each step.

All events must be code blocks which will be executed at the end of each step. There is no support for filtering or event propagations.

For an easy integration of ruby classes into the event loop the Forwardable#def_event_loop_delegator can be used.

Examples:

Example for using the EventLoop

event_loop = EventLoop.new 
event_loop.once do 
  puts "called once"
end

event_loop.every(1.0) do 
  puts "called every second"
end

callback = Proc.new |result|
  puts result 
end
event_loop.defer callback do
  sleep 2
  "result from the worker thread #{Thread.current}"
end

event_loop.exec

Author:

Defined Under Namespace

Modules: Forwardable Classes: Event, Timer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEventLoop

A new EventLoop



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/utilrb/event_loop.rb', line 189

def initialize
    @mutex = Mutex.new
    @events = Queue.new               # stores all events for the next step
    @timers = Set.new                 # stores all timers
    @every_cylce_events = Set.new     # stores all events which are added to @events each step
    @on_error = {}                    # stores on error callbacks
    @errors = Queue.new               # stores errors which will be re raised at the end of the step
    @number_of_events_to_process = 0  # number of events which are processed in the current step
    @thread_pool = ThreadPool.new
    @thread = Thread.current #the event loop thread
    @stop = nil
end

Instance Attribute Details

#thread_poolUtilrb::ThreadPool (readonly)

Underlying thread pool used to defer work.

Returns:



186
187
188
# File 'lib/utilrb/event_loop.rb', line 186

def thread_pool
  @thread_pool
end

Class Method Details

.cleanup_backtrace(&block) ⇒ Object



176
177
178
179
180
181
# File 'lib/utilrb/event_loop.rb', line 176

def self.cleanup_backtrace(&block)
    block.call
rescue
    $@.delete_if{|s| %r"#{Regexp.quote(__FILE__)}"o =~ s}
    ::Kernel::raise
end

Instance Method Details

#add_event(event, every_step = false) ⇒ Object

Adds an Event to the event loop

Parameters:

  • event (Event)

    The event

  • every_step (Boolean) (defaults to: false)

    Automatically added for every step



641
642
643
644
645
646
647
648
649
650
651
# File 'lib/utilrb/event_loop.rb', line 641

def add_event(event,every_step = false)
    raise ArgumentError "cannot add event which is ignored." if event.ignore?
    if every_step
        @mutex.synchronize do
            @every_cylce_events << event
        end
    else
        @events << event
    end
    event
end

#add_task(task) ⇒ Object

Adds a task to the thread pool

Parameters:



656
657
658
# File 'lib/utilrb/event_loop.rb', line 656

def add_task(task)
    thread_pool << task
end

#add_timer(timer) ⇒ Object

Adds a timer to the event loop

Parameters:

  • timer (Timer)

    The timer.



630
631
632
633
634
635
# File 'lib/utilrb/event_loop.rb', line 630

def add_timer(timer)
    @mutex.synchronize do
        raise "timer #{timer}:#{timer.doc} was already added!" if @timers.include?(timer)
        @timers << timer
    end
end

#async(work, *args) {|result| ... } ⇒ Utilrb::ThreadPool::Task

Integrates a blocking operation call into the EventLoop like #defer but has a more suitable syntax for deferring a method call

async method(:my_method) do |result,exception|
      if exception
              raise exception
      else
              puts result
      end
end

Parameters:

  • work (#call)

    The proc which will be deferred

Yields:

  • (result)

    The callback

  • (result, exception)

    The callback

Returns:



217
218
219
# File 'lib/utilrb/event_loop.rb', line 217

def async(work,*args,&callback)
    async_with_options(work,Hash.new,*args,&callback)
end

#async_every(work, options = Hash.new, *args, &callback) ⇒ EventLoop::Timer

Integrates a blocking operation call like #async but automatically re queues the call if period was passed and the task was finished by the worker thread. This means it will never re queue the call if the task blocks for ever and it will never simultaneously defer the call to more than one worker thread.

Parameters:

  • options (Hash) (defaults to: Hash.new)

    The options

  • work (#call)

    The proc which will be deferred

  • options (Hash) (defaults to: Hash.new)

    The options

  • options (Hash) (defaults to: Hash.new)

    The options of the task.

  • args (Array)

    The arguments for the code block

  • block (#call)

    The code block

Options Hash (options):

  • :period (Float)

    The period

  • :start (Boolean)

    Starts the timer right away (default = true)

  • :callback (Proc)

    The callback

  • :known_errors (class)

    Known erros which will be rescued

  • :on_error (Proc)

    Callback which is called when an error occured

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.

Returns:

Raises:

  • (ArgumentError)


251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/utilrb/event_loop.rb', line 251

def async_every(work,options=Hash.new,*args, &callback)
    options, async_opt = Kernel.filter_options(options,:period,:start => true)
    period = options[:period]
    raise ArgumentError,"No period given" unless period
    task = nil
    every period ,options[:start] do
        if !task
            task = async_with_options(work,async_opt,*args,&callback)
        elsif task.finished?
            add_task task
        end
        task
    end
end

#async_with_options(work, options = Hash.new, *args) {|result| ... } ⇒ Utilrb::ThreadPool::Task

Integrates a blocking operation call into the EventLoop like #defer but has a more suitable syntax for deferring a method call

async method(:my_method) do |result,exception|
      if exception
              raise exception
      else
              puts result
      end
end

Parameters:

  • work (#call)

    The proc which will be deferred

  • options (Hash) (defaults to: Hash.new)

    The options

Options Hash (options):

  • :callback (Proc)

    The callback

  • :known_errors (class)

    Known erros which will be rescued

  • :on_error (Proc)

    Callback which is called when an error occured

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.

Yields:

  • (result)

    The callback

  • (result, exception)

    The callback

Returns:



224
225
226
227
# File 'lib/utilrb/event_loop.rb', line 224

def async_with_options(work,options=Hash.new,*args,&callback)
    options[:callback] = callback
    defer(options,*args,&work)
end

#backlogFixnum

Number of tasks waiting for execution

Returns:

  • (Fixnum)

    the number of tasks



567
568
569
# File 'lib/utilrb/event_loop.rb', line 567

def backlog
    thread_pool.backlog
end

#call(&block) ⇒ Event, Object

Calls the give block in the event loop thread. If the current thread is the event loop thread it will execute it right a way and returns the result of the code block call. Otherwise, it returns an handler to the Event which was queued.

Returns:



374
375
376
377
378
379
380
# File 'lib/utilrb/event_loop.rb', line 374

def call(&block)
    if thread?
        block.call
    else
        once(&block)
    end
end

#cancel_timer(timer) ⇒ Object

Cancels the given timer if it is running otherwise it does nothing.

Parameters:

  • timer (Timer)

    The timer



491
492
493
494
495
# File 'lib/utilrb/event_loop.rb', line 491

def cancel_timer(timer)
    @mutex.synchronize do
        @timers.delete timer
    end
end

#clearObject

Clears all timers, events and errors



661
662
663
664
665
666
667
668
669
670
# File 'lib/utilrb/event_loop.rb', line 661

def clear
    thread_pool.clear

    @errors.clear
    @events.clear
    @mutex.synchronize do
        @every_cylce_events.clear
        @timers.clear
    end
end

#clear_errorsObject

Clears all errors which occurred during the last step and are not marked as known If the errors were not cleared they are re raised the next time step is called.



674
675
676
# File 'lib/utilrb/event_loop.rb', line 674

def clear_errors
    @errors.clear
end

#defer(options = Hash.new, *args, &block) ⇒ ThreadPool::Task

Integrates a blocking operation call into the EventLoop by executing it from a different thread. The given callback will be called from the EventLoop thread while processing its events after the call returned.

If the callback has an arity of 2 the exception will be passed to the callback as second parameter in an event of an error. The error is also passed to the error handlers of the even loop, but it will not be re raised if the error is marked as known

To overwrite an error the callback can return :ignore_error or a new instance of an error in an event of an error. In this case the error handlers of the event loop will not be called or called with the new error instance.

callback = Proc.new do |r,e|

   if e
      :ignore_error
   else
      puts r
   end
end

defer(=> callback) do

raise

end

Examples:

ignore a error

Parameters:

  • options (Hash) (defaults to: Hash.new)

    The options

  • options (Hash) (defaults to: Hash.new)

    The options of the task.

  • args (Array)

    The arguments for the code block

  • block (#call)

    The code block

Options Hash (options):

  • :callback (Proc)

    The callback

  • :known_errors (class)

    Known erros which will be rescued

  • :on_error (Proc)

    Callback which is called when an error occured

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.

Returns:



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/utilrb/event_loop.rb', line 301

def defer(options=Hash.new,*args,&block)
    options, task_options = Kernel.filter_options(options,{:callback => nil,:known_errors => [],:on_error => nil})
    callback = options[:callback]
    error_callback = options[:on_error]
    known_errors = Array(options[:known_errors])

    task = Utilrb::ThreadPool::Task.new(task_options,*args,&block)
    # ensures that user callback is called from main thread and not from worker threads
    if callback
        task.callback do |result,exception|
            once do
                if callback.arity == 1
                    callback.call result if !exception
                else
                    e = callback.call result,exception
                    #check if the error was overwritten in the
                    #case of an error
                    exception = if exception
                                    if e.is_a?(Symbol) && e == :ignore_error
                                        nil
                                    elsif e.is_a? Exception
                                        e
                                    else
                                        exception
                                    end
                                end
                end
                if exception
                    error_callback.call(exception) if error_callback
                    raises = !known_errors.any? {|error| exception.is_a?(error)}
                    handle_error(exception,raises)
                end
            end
        end
    else
        task.callback do |result,exception|
            if exception
                raises = !known_errors.find {|error| exception.is_a?(error)}
                once do
                    error_callback.call(exception) if error_callback
                    handle_error(exception,raises)
                end
            end
        end
    end
    @mutex.synchronize do
        @thread_pool << task
    end
    task
end

#events?Boolean

Returns true if events are queued.

Returns:

  • (Boolean)


385
386
387
# File 'lib/utilrb/event_loop.rb', line 385

def events?
    !@events.empty? || !@errors.empty?
end

#every(period, start = true) { ... } ⇒ Utilrb::EventLoop::Timer

Adds a timer to the event loop which will execute the given code block with the given period from the event loop thread.

Parameters:

  • period (Float)

    The period of the timer in seconds

Yields:

  • The code block.

Returns:



397
398
399
400
401
# File 'lib/utilrb/event_loop.rb', line 397

def every(period,start=true,&block)
    timer = Timer.new(self,period,&block)
    timer.start if start # adds itself to the event loop
    timer
end

#every_step(&block) ⇒ Event

Executes the given block every step from the event loop thread.

Returns:



406
407
408
# File 'lib/utilrb/event_loop.rb', line 406

def every_step(&block)
    add_event Event.new(block),true
end

#exec(period = 0.05) { ... } ⇒ Object

Starts the event loop with the given period. If a code block is given it will be executed at the end of each step. This method will block until stop is called

Parameters:

  • period (Float) (defaults to: 0.05)

    The period

Yields:

  • The code block



515
516
517
518
519
520
521
522
523
524
# File 'lib/utilrb/event_loop.rb', line 515

def exec(period=0.05,&block)
    @stop = false
    reset_timers
    while !@stop
        last_step = Time.now
        step(last_step,&block)
        diff = (Time.now-last_step).to_f
        sleep(period-diff) if diff < period && !@stop
    end
end

#handle_error(error, save = true) ⇒ Object



678
679
680
681
682
683
684
685
686
687
688
# File 'lib/utilrb/event_loop.rb', line 678

def handle_error(error,save = true)
    call do
        on_error = @mutex.synchronize do
            @on_error.find_all{|key,e| error.is_a? key}.map(&:last).flatten
        end
        on_error.each do |handler|
            handler.call error
        end
        @errors << error if save == true
    end
end

#on_error(error_class) {|exception| ... } ⇒ Object

Errors caught during event loop callbacks are forwarded to registered code blocks. The code block is called from the event loop thread.

Parameters:

  • @error_class

    The error class the block should be called for

Yields:

  • (exception)

    The code block



416
417
418
419
420
421
# File 'lib/utilrb/event_loop.rb', line 416

def on_error(error_class,&block)
    @mutex.synchronize do
        @on_error[error_class] ||= []
        @on_error[error_class]  << block
    end
end

#on_errors(*error_classes) {|exception| ... } ⇒ Object

Errors caught during event loop callbacks are forwarded to registered code blocks. The code blocks are called from the event loop thread.

Parameters:

  • @error_classes

    The error classes the block should be called for

Yields:

  • (exception)

    The code block



429
430
431
432
433
434
# File 'lib/utilrb/event_loop.rb', line 429

def on_errors(*error_classes,&block)
    error_classes.flatten!
    error_classes.each do |error_class|
        on_error(error_class,&block)
    end
end

#once(delay = nil) { ... } ⇒ Utilrb::EventLoop::Timer, Event

Executes the given block in the next step from the event loop thread. Returns a Timer object if a delay is set otherwise an handler to the Event which was queued.

Yields:

  • The code block.

Returns:



358
359
360
361
362
363
364
365
366
# File 'lib/utilrb/event_loop.rb', line 358

def once(delay=nil,&block)
    raise ArgumentError "no block given" unless block
    if delay && delay > 0
        timer = Timer.new(self,delay,true,&block)
        timer.start
    else
        add_event(Event.new(block))
    end
end

#pretty_print(pp) ⇒ Object

:nodoc:



234
235
236
# File 'lib/utilrb/event_loop.rb', line 234

def pretty_print(pp) # :nodoc:
    pp.text "EventLoop "
end

#reraise_error(error) ⇒ Object

Raises:

  • (error)


576
577
578
# File 'lib/utilrb/event_loop.rb', line 576

def reraise_error(error)
    raise error, error.message, error.backtrace + caller(1)
end

#reset_timers(time = Time.now) ⇒ Object

Resets all timers to fire not before their hole period is passed counting from the given point in time.

Parameters:

  • time (Time) (defaults to: Time.now)

    The time



501
502
503
504
505
506
507
# File 'lib/utilrb/event_loop.rb', line 501

def reset_timers(time = Time.now)
    @mutex.synchronize do 
        @timers.each do |timer|
            timer.reset time
        end
    end
end

#shutdownObject

Shuts down the thread pool



572
573
574
# File 'lib/utilrb/event_loop.rb', line 572

def shutdown()
    thread_pool.shutdown()
end

#step(time = Time.now) { ... } ⇒ Object

Handles all current events and timers. If a code block is given it will be executed at the end.

Parameters:

  • time (Time) (defaults to: Time.now)

    The time the step is executed for.

Yields:

  • The code block



585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
# File 'lib/utilrb/event_loop.rb', line 585

def step(time = Time.now,&block)
    validate_thread
    reraise_error(@errors.shift) if !@errors.empty?

    #copy all work otherwise it would not be allowed to 
    #call any event loop functions from a timer
    timers,call = @mutex.synchronize do
                            @every_cylce_events.delete_if &:ignore?
                            @every_cylce_events.each do |event|
                                add_event event
                            end

                            # check all timers
                            temp_timers = @timers.find_all do |timer|
                                timer.timeout?(time)
                            end
                            # delete single shot timer which elapsed
                            @timers -= temp_timers.find_all(&:single_shot?)
                            [temp_timers,block]
                        end

    # handle all current events but not the one which are added during processing.
    # Step is recursively be called if wait_for is used insight an event code block.
    # To make sure that all events and timer are processed in the right order
    # @number_of_events_to_process and a second timeout check is used.
    @number_of_events_to_process = [@events.size,@number_of_events_to_process].max
    while @number_of_events_to_process > 0
        event = @events.pop
        @number_of_events_to_process -= 1
        handle_errors{event.call} unless event.ignore?
    end
    timers.each do |timer|
        next if timer.stopped?
        handle_errors{timer.call(time)} if timer.timeout?(time)
    end
    handle_errors{call.call} if call
    reraise_error(@errors.shift) if !@errors.empty?
    
    #allow thread pool to take over
    Thread.pass
end

#steps(period = 0.05, max_time = 1.0, &block) ⇒ Object

Steps with the given period until all worker thread are waiting for work

Parameters:

  • period (Float) (defaults to: 0.05)

    Ther period

  • (@see #step)


554
555
556
557
558
559
560
561
562
563
564
# File 'lib/utilrb/event_loop.rb', line 554

def steps(period = 0.05,max_time=1.0,&block)
    start = Time.now
    begin
        last_step = Time.now
        step(last_step,&block)
        time = Time.now
        break if max_time && max_time <= (time-start).to_f
        diff = (time-last_step).to_f
        sleep(period-diff) if diff < period && !@stop
    end while (thread_pool.process? || events?)
end

#stopObject

Stops the EventLoop after [#exec] was called.



527
528
529
# File 'lib/utilrb/event_loop.rb', line 527

def stop
    @stop = true
end

#sync(sync_key, *args) {|*args| ... } ⇒ Object

Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.

This is useful for instance member calls which are not thread safe.

Parameters:

  • sync_key (Object)

    The sync key

Yields:

  • (*args)

    the code block block

Returns:

  • (Object)

    The result of the code block



230
231
232
# File 'lib/utilrb/event_loop.rb', line 230

def sync(sync_key,*args,&block)
    thread_pool.sync(sync_key,*args,&block)
end

#thread=(thread) ⇒ Object

Sets the event loop thread. By default it is set to the one the EventLoop was started from.

@param thread The thread



462
463
464
465
466
# File 'lib/utilrb/event_loop.rb', line 462

def thread=(thread)
    @mutex.synchronize do
        @thread = thread
    end
end

#thread?Boolean

Returns true if the current thread is the event loop thread.

Returns:

  • (Boolean)


448
449
450
451
452
453
454
455
456
# File 'lib/utilrb/event_loop.rb', line 448

def thread?
    @mutex.synchronize do
        if Thread.current == @thread
            true
        else
            false
        end
    end
end

#timer?(timer) ⇒ Boolean

Returns true if the given timer is running.

Parameters:

  • timer (Timer)

    The timer.

Returns:

  • (Boolean)


472
473
474
475
476
# File 'lib/utilrb/event_loop.rb', line 472

def timer?(timer)
    @mutex.synchronize do
        @timers.include? timer
    end
end

#timersObject

Returns all currently running timers.



481
482
483
484
485
# File 'lib/utilrb/event_loop.rb', line 481

def timers
    @mutex.synchronize do
        @timers.dup
    end
end

#validate_threadObject

Raises if the current thread is not the event loop thread (by default the one the event loop was started from).

Raises:

  • (RuntimeError)


440
441
442
# File 'lib/utilrb/event_loop.rb', line 440

def validate_thread
    raise "current thread is not the event loop thread" if !thread?
end

#wait_for(period = 0.05, timeout = nil, &block) ⇒ Object

Steps with the given period until the given block returns true.

Parameters:

  • period (Float) (defaults to: 0.05)

    The period

  • timeout (Float) (defaults to: nil)

    The timeout in seconds

Yield Returns:

  • (Boolean)


537
538
539
540
541
542
543
544
545
546
547
# File 'lib/utilrb/event_loop.rb', line 537

def wait_for(period=0.05,timeout=nil,&block)
    start = Time.now
    old_stop = @stop
    exec period do
        stop if block.call
        if timeout && timeout <= (Time.now-start).to_f
            raise RuntimeError,"Timeout during wait_for"
        end
    end
    @stop = old_stop
end