Class: PgStream::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_stream/processor.rb

Constant Summary collapse

CALLBACK_TYPES =
[:before_execute, :during_execute, :after_execute]

Instance Method Summary collapse

Constructor Details

#initialize(stream) ⇒ Processor

Returns a new instance of Processor.



5
6
7
8
9
10
11
# File 'lib/pg_stream/processor.rb', line 5

def initialize(stream)
  @stream = stream
  @callbacks = CALLBACK_TYPES.map do |type|
    [type, []]
  end.to_h
  @row_count = 0
end

Instance Method Details

#executeObject



24
25
26
27
28
29
30
31
32
# File 'lib/pg_stream/processor.rb', line 24

def execute
  @callbacks[:before_execute].each(&:call)
  @stream.each_row do |row|
    @row_count += 1
    @callbacks[:during_execute].each { |y| y.call(row, @row_count) }
  end
  @callbacks[:after_execute].each { |y| y.call(@row_count) }
  @row_count
end

#register(args) ⇒ Object



13
14
15
16
17
18
19
20
21
22
# File 'lib/pg_stream/processor.rb', line 13

def register(args)
  args.each do |type, function|
    if CALLBACK_TYPES.include?(type)
      @callbacks[type] << function
    else
      raise "#{type} is not an acceptable callback type. Types include #{CALLBACK_TYPES.join(', ')}"
    end
  end
  @callbacks
end