Class: DatawireQuarkCore::Eventor

Inherits:
Object
  • Object
show all
Defined in:
lib/datawire-quark-core.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(runtime) ⇒ Eventor

Returns a new instance of Eventor.



490
491
492
493
494
495
496
497
# File 'lib/datawire-quark-core.rb', line 490

def initialize(runtime)
  @executor = Concurrent::SingleThreadExecutor.new
  @timers = Concurrent::TimerSet.new(:executor => @executor)
  @sources = Sources.new
  @runtime = runtime
  at_exit { wait_for_sources }
  @log = Logger.new "quark.runtime"
end

Instance Attribute Details

#runtimeObject (readonly)

Returns the value of attribute runtime.



499
500
501
# File 'lib/datawire-quark-core.rb', line 499

def runtime
  @runtime
end

Instance Method Details

#add(source) ⇒ Object



501
502
503
# File 'lib/datawire-quark-core.rb', line 501

def add(source)
  @sources.add source
end

#event(final: nil, &block) ⇒ Object



505
506
507
# File 'lib/datawire-quark-core.rb', line 505

def event(final:nil, &block)
  @executor.post { execute(final, block) }
end

#execute(final, block) ⇒ Object



509
510
511
512
513
514
515
516
517
518
# File 'lib/datawire-quark-core.rb', line 509

def execute(final, block)
  begin
    block.call()
  rescue => ex
    puts "aieee", ex, ex.backtrace
    @log.error ex
  ensure
    @sources.remove final if final
  end
end

#schedule(delay, &block) ⇒ Object



520
521
522
# File 'lib/datawire-quark-core.rb', line 520

def schedule(delay, &block)
  @timers.post(delay, &block)
end

#wait_for_sourcesObject



524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
# File 'lib/datawire-quark-core.rb', line 524

def wait_for_sources
  last = Time.new - 10
  delta = 1
  while not @sources.empty?
    now = Time.new
    if now - last > delta
      @sources.explain
      last = now
      if delta < 60
        delta = delta * 1.41
      end
    end
    sleep 0.1
  end
rescue Interrupt
  #@log.warn "Interrupted"
end