Class: Langis::Engine::EventMachineEngine

Inherits:
Object
  • Object
show all
Defined in:
lib/langis/engine.rb

Overview

And EventMachine based implementation of a Langis Engine. Its sole job is to take a pumped message into an intake and broadcast the same message to all of the intake’s registered sinks. In essense these engines need to execute the sinks handler methods for each message.

This class leverages EventMachine’s features to easily do efficient publishing to the subscribers, and uses the EventMachineRunner to do the actual code execution.

See Also:

Instance Method Summary collapse

Constructor Details

#initialize(intakes, options = {}) ⇒ EventMachineEngine

Returns a new instance of EventMachineEngine.

Parameters:

  • intakes (Hash{String=>Array<#call>})

    The mapping of intake names to the list of Rackish applications (sinks) that subscribed to the given intake.

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :evm (Object) — default: EventMachine

    Specify a different class/module to use when executing deferred calls. Mainly useful for unit testing, or if you want to run the app directly in the main thread instead of the deferred thread pool.

  • :evm_channel (Class) — default: EventMachine::Channel

    The channel class to instantiate as the underlying pub-sub engine. This is useful for unittesting, or if you want to implement a non-EventMachine pub-sub mechanism.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/langis/engine.rb', line 110

def initialize(intakes, options={})
  evm_channel_class = options[:evm_channel] || EventMachine::Channel
  @intake_channels = {}
  intakes.each do |intake_name, apps|
    runner_options = {
      :success_channel => options[:success_channel],
      :error_channel => options[:error_channel],
      :evm => options[:evm],
      :intake_name => intake_name
    }
    @intake_channels[intake_name] = channel = evm_channel_class.new
    apps.each do |app|
      channel.subscribe EventMachineRunner.new(app, runner_options)
    end
  end
end

Instance Method Details

#pump(message) ⇒ Object #pump(message, ...) ⇒ Object

Publishes a message into the Langis publish-subscribe bus.

Overloads:

  • #pump(message) ⇒ Object

    Publishes the message to the :default intake.

    Parameters:

    • message (Object)

      The message to publish.

  • #pump(message, ...) ⇒ Object

    Publishes the message to the given list of intakes.

    Parameters:

    • message (Object)

      The message to publish.

    • ... (#to_s)

      Publish the message to these listed intakes



137
138
139
140
141
142
143
# File 'lib/langis/engine.rb', line 137

def pump(message, *intakes)
  intakes.unshift :default if intakes.empty?
  intakes.each do |name|
    channel = @intake_channels[name.to_s]
    channel.push(message) if channel
  end
end