Module: Inform::Events
- Included in:
- Object, System::Object
- Defined in:
- lib/runtime/events.rb,
lib/runtime/events.rb
Overview
The Inform::Events module
Constant Summary collapse
- REGISTRY =
Struct.new(:memo).new({})
- THREADS_PER_PROCESSOR =
10- DEFAULT_ADD_EVENT_PARAMS =
{ delay: 0 }.freeze
Class Method Summary collapse
- .active_objects ⇒ Object
-
.available_processors ⇒ Object
rubocop: disable Metrics/AbcSize rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/MethodLength.
- .each(event_class = Inform::Event, &block) ⇒ Object
- .init_java_pool ⇒ Object
- .init_java_pooled_executor ⇒ Object
- .init_java_scheduled_executor ⇒ Object
- .init_java_scheduler ⇒ Object
- .init_pool ⇒ Object
- .init_scheduled_executor ⇒ Object
- .init_scheduler ⇒ Object
- .object_events ⇒ Object
- .pool ⇒ Object
- .pooled_executor ⇒ Object
- .possibilities ⇒ Object
- .scheduled_executor ⇒ Object
- .scheduler ⇒ Object
Instance Method Summary collapse
- #active? ⇒ Boolean (also: #already_active?)
- #add_event(params = {}, event_params = { cause: self }, &block) ⇒ Object (also: #eventually, #later, #queue, #enqueue)
- #contextualize ⇒ Object
- #delay(delay, &block) ⇒ Object (also: #after_delay, #delayed_event)
- #elementally(ctx, &block) ⇒ Object
-
#event(params = {}, &block) ⇒ Object
(also: #new_event, #now, #react, #respond)
TODO: Figure out a way to determine if an existing event is already finished.
- #events(klass = Inform::Event) ⇒ Object
- #immediately(ctx, &block) ⇒ Object
- #interrupt(event_class = nil) ⇒ Object
- #register_callback(ctx, *args, &callback) ⇒ Object
Class Method Details
.active_objects ⇒ Object
549 550 551 |
# File 'lib/runtime/events.rb', line 549 def active_objects @active_objects ||= defined?(Java) ? java.util.concurrent.ConcurrentLinkedQueue.new : [] end |
.available_processors ⇒ Object
rubocop: disable Metrics/AbcSize rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/MethodLength
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 |
# File 'lib/runtime/events.rb', line 578 def available_processors return java.lang.Runtime.runtime.available_processors if defined?(Java) case RbConfig::CONFIG['host_os'] when /darwin9/ if File.exist?(`which hwprefs`) `hwprefs cpu_count`.strip.to_i else # Better than nothing require 'etc' Etc.nprocessors end when /darwin/ if File.exist?(`which hwprefs`) `hwprefs thread_count`.strip.to_i else `sysctl -n hw.ncpu`.strip.to_i end when /linux/ `grep -c processor /proc/cpuinfo`.strip.to_i when /freebsd/ `sysctl -n hw.ncpu`.strip.to_i when /mswin|mingw/ require 'win32ole' cpu = WIN32OLE.connect("winmgmts://").ExecQuery("select NumberOfCores from Win32_Processor") cpu.to_enum.first.NumberOfCores else # Better than nothing require 'etc' Etc.nprocessors end end |
.each(event_class = Inform::Event, &block) ⇒ Object
570 571 572 |
# File 'lib/runtime/events.rb', line 570 def each(event_class = Inform::Event, &block) events[event_class].each { |event| block.call(event) } if block_given? end |
.init_java_pool ⇒ Object
676 677 678 |
# File 'lib/runtime/events.rb', line 676 def init_java_pool com.google.common.util.concurrent.MoreExecutors.listeningDecorator(pooled_executor) end |
.init_java_pooled_executor ⇒ Object
641 642 643 |
# File 'lib/runtime/events.rb', line 641 def init_java_pooled_executor Executors.newFixedThreadPool(available_processors * THREADS_PER_PROCESSOR) end |
.init_java_scheduled_executor ⇒ Object
627 628 629 630 631 632 |
# File 'lib/runtime/events.rb', line 627 def init_java_scheduled_executor executor = java.util.concurrent.Executors.newScheduledThreadPool( available_processors * THREADS_PER_PROCESSOR) executor.setRemoveOnCancelPolicy(true) executor end |
.init_java_scheduler ⇒ Object
659 660 661 |
# File 'lib/runtime/events.rb', line 659 def init_java_scheduler com.google.common.util.concurrent.MoreExecutors.listeningDecorator(ScheduledExecutor) end |
.init_pool ⇒ Object
669 670 671 672 673 |
# File 'lib/runtime/events.rb', line 669 def init_pool # TODO: Implement # require 'ruby-concurrency' # Concurrent::ThreadPoolExecutor.new(available_processors * THREADS_PER_PROCESSOR) end |
.init_scheduled_executor ⇒ Object
620 621 622 623 624 |
# File 'lib/runtime/events.rb', line 620 def init_scheduled_executor # TODO: Implement # require 'ruby-concurrency' # Concurrent::ScheduledThreadPoolExecutor.new(available_processors * THREADS_PER_PROCESSOR) end |
.init_scheduler ⇒ Object
652 653 654 655 656 |
# File 'lib/runtime/events.rb', line 652 def init_scheduler # TODO: Implement # require 'ruby-concurrency' # Concurrent::ThreadPoolExecutor.new(available_processors * THREADS_PER_PROCESSOR) end |
.object_events ⇒ Object
559 560 561 562 563 |
# File 'lib/runtime/events.rb', line 559 def object_events Inform::Events.possibilities[identity] ||= defined?(Java) ? java.util.concurrent.ConcurrentHashMap.new : {} Inform::Events.possibilities[identity] end |
.pool ⇒ Object
664 665 666 |
# File 'lib/runtime/events.rb', line 664 def pool REGISTRY.memo[:pool] ||= defined?(Java) ? init_java_pool : init_pool end |
.pooled_executor ⇒ Object
635 636 637 638 |
# File 'lib/runtime/events.rb', line 635 def pooled_executor REGISTRY.memo[:pooled_executor] ||= defined?(Java) ? init_java_pooled_executor : init_pooled_executor end |
.possibilities ⇒ Object
554 555 556 |
# File 'lib/runtime/events.rb', line 554 def possibilities @possibilities ||= defined?(Java) ? java.util.concurrent.ConcurrentHashMap.new : {} end |
Instance Method Details
#active? ⇒ Boolean Also known as: already_active?
746 747 748 |
# File 'lib/runtime/events.rb', line 746 def active? !events.empty? end |
#add_event(params = {}, event_params = { cause: self }, &block) ⇒ Object Also known as: eventually, later, queue, enqueue
698 699 700 701 702 703 704 705 706 707 |
# File 'lib/runtime/events.rb', line 698 def add_event(params = {}, event_params = { cause: self }, &block) if (e = Thread.current[:event]).nil? params = { command: params } unless params.respond_to?(:merge) event_params[:name] = params.delete(:command) if params.include?(:command) Inform::Event.new(event_params.merge(params), &block) else event_params[:name] = params.delete(:command) if params.include?(:command) e.chain(event_params.merge(DEFAULT_ADD_EVENT_PARAMS.merge(event_params)), &block) end end |
#contextualize ⇒ Object
742 743 744 |
# File 'lib/runtime/events.rb', line 742 def contextualize Thread.current[:event]&.contextualize(self) end |
#delay(delay, &block) ⇒ Object Also known as: after_delay, delayed_event
728 729 730 |
# File 'lib/runtime/events.rb', line 728 def delay(delay, &block) Inform::Event.new({ cause: self, delay: delay }, &block) end |
#elementally(ctx, &block) ⇒ Object
738 739 740 |
# File 'lib/runtime/events.rb', line 738 def elementally(ctx, &block) Inform::Event.new({ cause: self, context: ctx, type: :elemental, when: :immediately }, &block) end |
#event(params = {}, &block) ⇒ Object Also known as: new_event, now, react, respond
TODO: Figure out a way to determine if an existing event is already finished. That is, find a way to implicitly determine if any existing event is in the react_after phase instead of the react_before phase.
717 718 719 720 721 722 |
# File 'lib/runtime/events.rb', line 717 def event(params = {}, &block) params = { command: params } unless params.respond_to?(:merge) event_params = { cause: self } event_params[:name] = params.delete(:command) if params.include?(:command) Inform::Event.new(event_params.merge(params), &block) end |
#events(klass = Inform::Event) ⇒ Object
566 567 568 |
# File 'lib/runtime/events.rb', line 566 def events(klass = Inform::Event) object_events[klass] ||= defined?(ConcurrentLinkedQueue) ? ConcurrentLinkedQueue.new : [] end |
#immediately(ctx, &block) ⇒ Object
734 735 736 |
# File 'lib/runtime/events.rb', line 734 def immediately(ctx, &block) Inform::Event.new({ cause: self, context: ctx, when: :immediately }, &block) end |
#interrupt(event_class = nil) ⇒ Object
751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 |
# File 'lib/runtime/events.rb', line 751 def interrupt(event_class = nil) interruption = Thread.current[:event] Inform::Events.each(event_class) do |event| # Do not artificially terminate the source of an interruption. next if event == interruption # Parameterizing Future#cancel with false is supposed to # abort the future without interrupting the executing thread. # Not interrupting the event future's thread may have the # consequence of a variety of data consistency issues. # event.future.cancel false if event != interruption # TODO: Test event.future.cancel false end false end |
#register_callback(ctx, *args, &callback) ⇒ Object
691 692 693 694 695 696 |
# File 'lib/runtime/events.rb', line 691 def register_callback(ctx, *args, &callback) e = Thread.current[:event] return true if e.nil? e.chain({ context: ctx, args: args }, &callback) true end |