Class: Datapipes::Sink

Inherits:
Object
  • Object
show all
Includes:
Composable
Defined in:
lib/datapipes/sink.rb

Overview

Build your own sink logic in ‘run` method.

Be careful each sinks are executed concurrently. Avoid race condition in multi sinks.

This is bad:

$shared = []

class A < Datapipes::Sink
  def run(data)
    $shared << data
  end
end

class B < Datapipes::Sink
  def run(data)
    $shared << data
  end
end

On the other hand, a sink is called serially. So you can touch shared object in one sink logic.

This is good:

class A < Datapipes::Source
  def initialize
    @shared = []
  end

  def run(data)
    @shared << data
  end
end

Instance Attribute Summary

Attributes included from Composable

#accumulated

Instance Method Summary collapse

Methods included from Composable

#+

Instance Method Details

#run(data) ⇒ Object

Override this in sub class



42
43
44
# File 'lib/datapipes/sink.rb', line 42

def run(data)
  data
end

#run_all(data) ⇒ Object

For internal uses.



47
48
49
50
51
52
53
# File 'lib/datapipes/sink.rb', line 47

def run_all(data)
  @accumulated ||= [self]
  count = Parallel.processor_count
  Parallel.each(@accumulated, in_threads: count) do |sink|
    sink.run(data)
  end
end