Class: Quark::MdkRuntime::Actors::MessageDispatcher

Inherits:
DatawireQuarkCore::QuarkObject show all
Extended by:
DatawireQuarkCore::Static
Defined in:
lib/mdk_runtime/actors.rb

Constant Summary

Constants included from DatawireQuarkCore::Static

DatawireQuarkCore::Static::Unassigned

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from DatawireQuarkCore::Static

_lazy_statics, static, unlazy_statics

Methods inherited from DatawireQuarkCore::QuarkObject

#to_s

Constructor Details

#initialize(callLater) ⇒ MessageDispatcher

Returns a new instance of MessageDispatcher.



505
506
507
508
509
510
511
# File 'lib/mdk_runtime/actors.rb', line 505

def initialize(callLater)
    
    self.__init_fields__
    (self).callLater = callLater

    nil
end

Instance Attribute Details

#_deliveringObject

Returns the value of attribute _delivering.



497
498
499
# File 'lib/mdk_runtime/actors.rb', line 497

def _delivering
  @_delivering
end

#_lockObject

Returns the value of attribute _lock.



497
498
499
# File 'lib/mdk_runtime/actors.rb', line 497

def _lock
  @_lock
end

#_queuedObject

Returns the value of attribute _queued.



497
498
499
# File 'lib/mdk_runtime/actors.rb', line 497

def _queued
  @_queued
end

#callLaterObject

Returns the value of attribute callLater.



497
498
499
# File 'lib/mdk_runtime/actors.rb', line 497

def callLater
  @callLater
end

#loggerObject

Returns the value of attribute logger.



497
498
499
# File 'lib/mdk_runtime/actors.rb', line 497

def logger
  @logger
end

Instance Method Details

#__init_fields__Object



672
673
674
675
676
677
678
679
680
681
# File 'lib/mdk_runtime/actors.rb', line 672

def __init_fields__()
    
    self.callLater = nil
    self.logger = ::Quark.quark._getLogger("actors")
    self._queued = ::DatawireQuarkCore::List.new([])
    self._delivering = false
    self._lock = ::DatawireQuarkCore::Lock.new()

    nil
end

#_callQueuedMessage(ignore, message) ⇒ Object



547
548
549
550
551
552
553
# File 'lib/mdk_runtime/actors.rb', line 547

def _callQueuedMessage(ignore, message)
    
    message.deliver()
    return true

    nil
end

#_getClassObject



622
623
624
625
626
627
# File 'lib/mdk_runtime/actors.rb', line 622

def _getClass()
    
    return "mdk_runtime.actors.MessageDispatcher"

    nil
end

#_getField(name) ⇒ Object



629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
# File 'lib/mdk_runtime/actors.rb', line 629

def _getField(name)
    
    if ((name) == ("callLater"))
        return (self).callLater
    end
    if ((name) == ("logger"))
        return (self).logger
    end
    if ((name) == ("_queued"))
        return (self)._queued
    end
    if ((name) == ("_delivering"))
        return (self)._delivering
    end
    if ((name) == ("_lock"))
        return (self)._lock
    end
    return nil

    nil
end

#_queue(inFlight) ⇒ Object

Queue a message for delivery.



558
559
560
561
562
563
564
565
566
567
568
569
570
# File 'lib/mdk_runtime/actors.rb', line 558

def _queue(inFlight)
    
    @logger.debug(("Queued ") + ((inFlight).to_s))
    (self)._lock.acquire()
    ((self)._queued) << (inFlight)
    if (!((self)._delivering))
        (self)._delivering = true
        @callLater.schedule(self)
    end
    (self)._lock.release()

    nil
end

#_setField(name, value) ⇒ Object



651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
# File 'lib/mdk_runtime/actors.rb', line 651

def _setField(name, value)
    
    if ((name) == ("callLater"))
        (self).callLater = ::DatawireQuarkCore.cast(value) { ::Quark.mdk_runtime.actors._CallLater }
    end
    if ((name) == ("logger"))
        (self).logger = value
    end
    if ((name) == ("_queued"))
        (self)._queued = ::DatawireQuarkCore.cast(value) { ::DatawireQuarkCore::List }
    end
    if ((name) == ("_delivering"))
        (self)._delivering = ::DatawireQuarkCore.cast(value) { ::Object }
    end
    if ((name) == ("_lock"))
        (self)._lock = ::DatawireQuarkCore.cast(value) { ::DatawireQuarkCore::Lock }
    end

    nil
end

#onExecute(runtime) ⇒ Object



597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
# File 'lib/mdk_runtime/actors.rb', line 597

def onExecute(runtime)
    
    (self)._lock.acquire()
    while ((((self)._queued).size) > (0)) do
        toDeliver = (self)._queued
        (self)._queued = ::DatawireQuarkCore::List.new([])
        (self)._lock.release()
        idx = 0
        while ((idx) < ((toDeliver).size)) do
            @logger.debug(("Delivering ") + (((toDeliver)[idx]).to_s))
            deliver = ::Quark.quark._BoundMethod.new(self, "_callQueuedMessage", ::DatawireQuarkCore::List.new([(toDeliver)[idx]]))
            success = ::DatawireQuarkCore.cast(::Quark.quark.concurrent.Context.runtime().callSafely(deliver, false)) { ::Object }
            if (!(success))
                @logger.warn(("FAILURE when delivering ") + (((toDeliver)[idx]).to_s))
            end
            idx = (idx) + (1)
        end
        (self)._lock.acquire()
    end
    (self)._delivering = false
    (self)._lock.release()

    nil
end

#pumpObject

Causes message delivery to occur when using manual CallLater



588
589
590
591
592
593
594
595
# File 'lib/mdk_runtime/actors.rb', line 588

def pump()
    
    while (self.queued()) do
        (self).callLater.runAll()
    end

    nil
end

#queuedObject

Are there any messages waiting to be delivered?



575
576
577
578
579
580
581
582
583
# File 'lib/mdk_runtime/actors.rb', line 575

def queued()
    
    (self)._lock.acquire()
    result = (self)._delivering
    (self)._lock.release()
    return result

    nil
end

#startActor(actor) ⇒ Object

Start an Actor.



530
531
532
533
534
535
# File 'lib/mdk_runtime/actors.rb', line 530

def startActor(actor)
    
    self._queue(::Quark.mdk_runtime.actors._StartStopActor.new(actor, self, true))

    nil
end

#stopActor(actor) ⇒ Object

Stop an Actor.



540
541
542
543
544
545
# File 'lib/mdk_runtime/actors.rb', line 540

def stopActor(actor)
    
    self._queue(::Quark.mdk_runtime.actors._StartStopActor.new(actor, self, false))

    nil
end

#tell(origin, message, destination) ⇒ Object

Queue a message from origin to destination, and trigger delivery if necessary.



519
520
521
522
523
524
525
# File 'lib/mdk_runtime/actors.rb', line 519

def tell(origin, message, destination)
    
    inFlight = ::Quark.mdk_runtime.actors._InFlightMessage.new(origin, message, destination)
    self._queue(inFlight)

    nil
end