Module: SimpleQuery::Stream::PostgresStream

Included in:
Builder
Defined in:
lib/simple_query/stream/postgres_stream.rb

Instance Method Summary collapse

Instance Method Details

#stream_each_postgres(batch_size, &block) ⇒ Object

rubocop:disable Metrics/MethodLength



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/simple_query/stream/postgres_stream.rb', line 7

def stream_each_postgres(batch_size, &block)
  select_sql = cached_sql

  conn = ActiveRecord::Base.connection.raw_connection
  cursor_name = "simple_query_cursor_#{object_id}"

  begin
    conn.exec("BEGIN")
    declare_sql = "DECLARE #{cursor_name} NO SCROLL CURSOR FOR #{select_sql}"
    conn.exec(declare_sql)

    loop do
      res = conn.exec("FETCH #{batch_size} FROM #{cursor_name}")
      break if res.ntuples.zero?

      res.each do |pg_row|
        record = build_row_object(pg_row)
        block.call(record)
      end
    end

    conn.exec("CLOSE #{cursor_name}")
    conn.exec("COMMIT")
  rescue StandardError => e
    begin
      conn.exec("ROLLBACK")
    rescue StandardError
      nil
    end
    raise e
  end
end