Class: EmPipelines::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/empipelines/pipeline.rb

Defined Under Namespace

Classes: TerminatorStage

Instance Method Summary collapse

Constructor Details

#initialize(em, context, monitoring) ⇒ Pipeline

Returns a new instance of Pipeline.



9
10
11
12
13
# File 'lib/empipelines/pipeline.rb', line 9

def initialize(em, context, monitoring)
  @em = em
  @context = context
  @monitoring = monitoring
end

Instance Method Details

#for(event_definition) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/empipelines/pipeline.rb', line 15

def for(event_definition)
  stages = event_definition.map(&instantiate_with_dependencies)

  monitoring = @monitoring

  first_stage_process = stages.reverse.reduce(TerminatorStage) do |current_head, next_stage|
    @em.spawn do |input_message|
      begin
        monitoring.debug "#{next_stage.class}#notify with #{input_message}}"
        next_stage.call(input_message) do |output|
          current_head.notify(output)
        end
      rescue => exception
        monitoring.inform_exception!(exception, next_stage, "Message #{input_message} is broken")
        input_message.broken!
      end
    end
  end
  @monitoring.inform "Pipeline for event_definition is: #{stages.map(&:class).join('->')}"
  first_stage_process
end