Class: Datapipes::Sink
- Inherits:
-
Object
- Object
- Datapipes::Sink
- Includes:
- Composable
- Defined in:
- lib/datapipes/sink.rb
Overview
Build your own sink logic in ‘run` method.
Be careful each sinks are executed concurrently. Avoid race condition in multi sinks.
This is bad:
$shared = []
class A < Datapipes::Sink
def run(data)
$shared << data
end
end
class B < Datapipes::Sink
def run(data)
$shared << data
end
end
On the other hand, a sink is called serially. So you can touch shared object in one sink logic.
This is good:
class A < Datapipes::Source
def initialize
@shared = []
end
def run(data)
@shared << data
end
end
Instance Attribute Summary
Attributes included from Composable
Instance Method Summary collapse
-
#run(data) ⇒ Object
Override this in sub class.
-
#run_all(data) ⇒ Object
For internal uses.
Methods included from Composable
Instance Method Details
#run(data) ⇒ Object
Override this in sub class
42 43 44 |
# File 'lib/datapipes/sink.rb', line 42 def run(data) data end |
#run_all(data) ⇒ Object
For internal uses.
47 48 49 50 51 52 53 |
# File 'lib/datapipes/sink.rb', line 47 def run_all(data) @accumulated ||= [self] count = Parallel.processor_count Parallel.each(@accumulated, in_threads: count) do |sink| sink.run(data) end end |