Class: Quark::MdkRuntime::Actors::MessageDispatcher
- Inherits:
-
DatawireQuarkCore::QuarkObject
- Object
- DatawireQuarkCore::QuarkObject
- Quark::MdkRuntime::Actors::MessageDispatcher
- Extended by:
- DatawireQuarkCore::Static
- Defined in:
- lib/mdk_runtime/actors.rb
Constant Summary
Constants included from DatawireQuarkCore::Static
DatawireQuarkCore::Static::Unassigned
Instance Attribute Summary collapse
-
#_delivering ⇒ Object
Returns the value of attribute _delivering.
-
#_lock ⇒ Object
Returns the value of attribute _lock.
-
#_queued ⇒ Object
Returns the value of attribute _queued.
-
#callLater ⇒ Object
Returns the value of attribute callLater.
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
- #__init_fields__ ⇒ Object
- #_callQueuedMessage(ignore, message) ⇒ Object
- #_getClass ⇒ Object
- #_getField(name) ⇒ Object
-
#_queue(inFlight) ⇒ Object
Queue a message for delivery.
- #_setField(name, value) ⇒ Object
-
#initialize(callLater) ⇒ MessageDispatcher
constructor
A new instance of MessageDispatcher.
- #onExecute(runtime) ⇒ Object
-
#pump ⇒ Object
Causes message delivery to occur when using manual CallLater.
-
#queued ⇒ Object
Are there any messages waiting to be delivered?.
-
#startActor(actor) ⇒ Object
Start an Actor.
-
#stopActor(actor) ⇒ Object
Stop an Actor.
-
#tell(origin, message, destination) ⇒ Object
Queue a message from origin to destination, and trigger delivery if necessary.
Methods included from DatawireQuarkCore::Static
_lazy_statics, static, unlazy_statics
Methods inherited from DatawireQuarkCore::QuarkObject
Constructor Details
#initialize(callLater) ⇒ MessageDispatcher
Returns a new instance of MessageDispatcher.
505 506 507 508 509 510 511 |
# File 'lib/mdk_runtime/actors.rb', line 505 def initialize(callLater) self.__init_fields__ (self).callLater = callLater nil end |
Instance Attribute Details
#_delivering ⇒ Object
Returns the value of attribute _delivering.
497 498 499 |
# File 'lib/mdk_runtime/actors.rb', line 497 def _delivering @_delivering end |
#_lock ⇒ Object
Returns the value of attribute _lock.
497 498 499 |
# File 'lib/mdk_runtime/actors.rb', line 497 def _lock @_lock end |
#_queued ⇒ Object
Returns the value of attribute _queued.
497 498 499 |
# File 'lib/mdk_runtime/actors.rb', line 497 def _queued @_queued end |
#callLater ⇒ Object
Returns the value of attribute callLater.
497 498 499 |
# File 'lib/mdk_runtime/actors.rb', line 497 def callLater @callLater end |
#logger ⇒ Object
Returns the value of attribute logger.
497 498 499 |
# File 'lib/mdk_runtime/actors.rb', line 497 def logger @logger end |
Instance Method Details
#__init_fields__ ⇒ Object
672 673 674 675 676 677 678 679 680 681 |
# File 'lib/mdk_runtime/actors.rb', line 672 def __init_fields__() self.callLater = nil self.logger = ::Quark.quark._getLogger("actors") self._queued = ::DatawireQuarkCore::List.new([]) self._delivering = false self._lock = ::DatawireQuarkCore::Lock.new() nil end |
#_callQueuedMessage(ignore, message) ⇒ Object
547 548 549 550 551 552 553 |
# File 'lib/mdk_runtime/actors.rb', line 547 def _callQueuedMessage(ignore, ) .deliver() return true nil end |
#_getClass ⇒ Object
622 623 624 625 626 627 |
# File 'lib/mdk_runtime/actors.rb', line 622 def _getClass() return "mdk_runtime.actors.MessageDispatcher" nil end |
#_getField(name) ⇒ Object
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 |
# File 'lib/mdk_runtime/actors.rb', line 629 def _getField(name) if ((name) == ("callLater")) return (self).callLater end if ((name) == ("logger")) return (self).logger end if ((name) == ("_queued")) return (self)._queued end if ((name) == ("_delivering")) return (self)._delivering end if ((name) == ("_lock")) return (self)._lock end return nil nil end |
#_queue(inFlight) ⇒ Object
Queue a message for delivery.
558 559 560 561 562 563 564 565 566 567 568 569 570 |
# File 'lib/mdk_runtime/actors.rb', line 558 def _queue(inFlight) @logger.debug(("Queued ") + ((inFlight).to_s)) (self)._lock.acquire() ((self)._queued) << (inFlight) if (!((self)._delivering)) (self)._delivering = true @callLater.schedule(self) end (self)._lock.release() nil end |
#_setField(name, value) ⇒ Object
651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 |
# File 'lib/mdk_runtime/actors.rb', line 651 def _setField(name, value) if ((name) == ("callLater")) (self).callLater = ::DatawireQuarkCore.cast(value) { ::Quark.mdk_runtime.actors._CallLater } end if ((name) == ("logger")) (self).logger = value end if ((name) == ("_queued")) (self)._queued = ::DatawireQuarkCore.cast(value) { ::DatawireQuarkCore::List } end if ((name) == ("_delivering")) (self)._delivering = ::DatawireQuarkCore.cast(value) { ::Object } end if ((name) == ("_lock")) (self)._lock = ::DatawireQuarkCore.cast(value) { ::DatawireQuarkCore::Lock } end nil end |
#onExecute(runtime) ⇒ Object
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 |
# File 'lib/mdk_runtime/actors.rb', line 597 def onExecute(runtime) (self)._lock.acquire() while ((((self)._queued).size) > (0)) do toDeliver = (self)._queued (self)._queued = ::DatawireQuarkCore::List.new([]) (self)._lock.release() idx = 0 while ((idx) < ((toDeliver).size)) do @logger.debug(("Delivering ") + (((toDeliver)[idx]).to_s)) deliver = ::Quark.quark._BoundMethod.new(self, "_callQueuedMessage", ::DatawireQuarkCore::List.new([(toDeliver)[idx]])) success = ::DatawireQuarkCore.cast(::Quark.quark.concurrent.Context.runtime().callSafely(deliver, false)) { ::Object } if (!(success)) @logger.warn(("FAILURE when delivering ") + (((toDeliver)[idx]).to_s)) end idx = (idx) + (1) end (self)._lock.acquire() end (self)._delivering = false (self)._lock.release() nil end |
#pump ⇒ Object
Causes message delivery to occur when using manual CallLater
588 589 590 591 592 593 594 595 |
# File 'lib/mdk_runtime/actors.rb', line 588 def pump() while (self.queued()) do (self).callLater.runAll() end nil end |
#queued ⇒ Object
Are there any messages waiting to be delivered?
575 576 577 578 579 580 581 582 583 |
# File 'lib/mdk_runtime/actors.rb', line 575 def queued() (self)._lock.acquire() result = (self)._delivering (self)._lock.release() return result nil end |
#startActor(actor) ⇒ Object
Start an Actor.
530 531 532 533 534 535 |
# File 'lib/mdk_runtime/actors.rb', line 530 def startActor(actor) self._queue(::Quark.mdk_runtime.actors._StartStopActor.new(actor, self, true)) nil end |
#stopActor(actor) ⇒ Object
Stop an Actor.
540 541 542 543 544 545 |
# File 'lib/mdk_runtime/actors.rb', line 540 def stopActor(actor) self._queue(::Quark.mdk_runtime.actors._StartStopActor.new(actor, self, false)) nil end |
#tell(origin, message, destination) ⇒ Object
Queue a message from origin to destination, and trigger delivery if necessary.
519 520 521 522 523 524 525 |
# File 'lib/mdk_runtime/actors.rb', line 519 def tell(origin, , destination) inFlight = ::Quark.mdk_runtime.actors._InFlightMessage.new(origin, , destination) self._queue(inFlight) nil end |