Class: EmPipelines::Pipeline
- Inherits:
-
Object
- Object
- EmPipelines::Pipeline
- Defined in:
- lib/empipelines/pipeline.rb
Defined Under Namespace
Classes: TerminatorStage
Instance Method Summary collapse
- #for(event_definition) ⇒ Object
-
#initialize(em, context, monitoring) ⇒ Pipeline
constructor
A new instance of Pipeline.
Constructor Details
#initialize(em, context, monitoring) ⇒ Pipeline
Returns a new instance of Pipeline.
9 10 11 12 13 |
# File 'lib/empipelines/pipeline.rb', line 9 def initialize(em, context, monitoring) @em = em @context = context @monitoring = monitoring end |
Instance Method Details
#for(event_definition) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/empipelines/pipeline.rb', line 15 def for(event_definition) stages = event_definition.map(&instantiate_with_dependencies) monitoring = @monitoring first_stage_process = stages.reverse.reduce(TerminatorStage) do |current_head, next_stage| @em.spawn do || begin monitoring.debug "#{next_stage.class}#notify with #{}}" next_stage.call() do |output| current_head.notify(output) end rescue => exception monitoring.inform_exception!(exception, next_stage, "Message #{} is broken") .broken! end end end @monitoring.inform "Pipeline for event_definition is: #{stages.map(&:class).join('->')}" first_stage_process end |