Class: PostgreSQLCursor::Cursor

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/postgresql_cursor/cursor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sql, options = {}) ⇒ Cursor

Public: Start a new PostgreSQL cursor query sql - The SQL statement with interpolated values options - hash of processing controls

while: value    - Exits loop when block does not return this value.
until: value    - Exits loop when block returns this value.
fraction: 0.1..1.0    - The cursor_tuple_fraction (default 1.0)
block_size: 1..n      - The number of rows to fetch per db block fetch
                        Defaults to 1000
with_hold       - Allows the query to remain open across commit points.

Examples

PostgreSQLCursor::Cursor.new("select ....")

Returns the cursor object when called with new.



44
45
46
47
48
49
50
51
# File 'lib/postgresql_cursor/cursor.rb', line 44

def initialize(sql, options = {})
  @sql = sql
  @options = options
  @connection = @options.fetch(:connection) { ::ActiveRecord::Base.connection }
  @count = 0
  @iterate = options[:instances] ? :each_instance : :each_row
  @batched = false
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



27
28
29
# File 'lib/postgresql_cursor/cursor.rb', line 27

def connection
  @connection
end

#countObject (readonly)

Returns the value of attribute count.



27
28
29
# File 'lib/postgresql_cursor/cursor.rb', line 27

def count
  @count
end

#optionsObject (readonly)

Returns the value of attribute options.



27
28
29
# File 'lib/postgresql_cursor/cursor.rb', line 27

def options
  @options
end

#resultObject (readonly)

Returns the value of attribute result.



27
28
29
# File 'lib/postgresql_cursor/cursor.rb', line 27

def result
  @result
end

#sqlObject (readonly)

Returns the value of attribute sql.



27
28
29
# File 'lib/postgresql_cursor/cursor.rb', line 27

def sql
  @sql
end

Instance Method Details

#cast_types(row) ⇒ Object



227
228
229
# File 'lib/postgresql_cursor/cursor.rb', line 227

def cast_types(row)
  row
end

#closeObject

Public: Closes the cursor



284
285
286
# File 'lib/postgresql_cursor/cursor.rb', line 284

def close
  @connection.execute("close #{@cursor}")
end

#column_typesObject



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/postgresql_cursor/cursor.rb', line 231

def column_types
  return nil if ::ActiveRecord::VERSION::MAJOR < 4
  return @column_types if @column_types

  types = {}
  fields = @result.fields
  fields.each_with_index do |fname, i|
    ftype = @result.ftype(i)
    fmod = @result.fmod(i)

    # From @netrusov 2023-01-18. This is the same call used in the PostgreSQL Adapter
    types[fname] = @connection.send(:get_oid_type, ftype, fmod, fname)

    # # From @simi 2023-01-18 (Works as well, used old calling method)
    # types[fname] = @connection.get_type_map.fetch(ftype)
  end

  @column_types = types
end

#each(&block) ⇒ Object

Public: Yields each row of the result set to the passed block

Yields the row to the block. The row is a hash with symbolized keys.

{colname: value, ....}

Returns the count of rows processed



86
87
88
89
90
91
92
93
94
# File 'lib/postgresql_cursor/cursor.rb', line 86

def each(&block)
  if @iterate == :each_row
    @batched ? each_row_batch(&block) : each_row(&block)
  elsif @iterate == :each_array
    @batched ? each_array_batch(&block) : each_array(&block)
  else
    @batched ? each_instance_batch(@type, &block) : each_instance(@type, &block)
  end
end

#each_array(&block) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/postgresql_cursor/cursor.rb', line 103

def each_array(&block)
  old_iterate = @iterate
  @iterate = :each_array
  begin
    rv = each_tuple do |row|
      block.call(row)
    end
  ensure
    @iterate = old_iterate
  end
  rv
end

#each_array_batch(&block) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/postgresql_cursor/cursor.rb', line 136

def each_array_batch(&block)
  old_iterate = @iterate
  @iterate = :each_array
  begin
    rv = each_batch do |batch|
      block.call(batch)
    end
  ensure
    @iterate = old_iterate
  end
  rv
end

#each_batch(&block) ⇒ Object

:nodoc:



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/postgresql_cursor/cursor.rb', line 207

def each_batch(&block) # :nodoc:
  has_do_until = @options.key?(:until)
  has_do_while = @options.key?(:while)
  @count = 0
  @column_types = nil
  with_optional_transaction do
    open
    while (batch = fetch_block)
      break if batch.empty?
      @count += 1
      rc = block.call(batch)
      break if has_do_until && rc == @options[:until]
      break if has_do_while && rc != @options[:while]
    end
  ensure
    close if @block && connection.active?
  end
  @count
end

#each_instance(klass = nil, &block) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/postgresql_cursor/cursor.rb', line 116

def each_instance(klass = nil, &block)
  klass ||= @type
  each_tuple do |row|
    if ::ActiveRecord::VERSION::MAJOR < 4
      model = klass.send(:instantiate, row)
    else
      @column_types ||= column_types
      model = klass.send(:instantiate, row, @column_types)
    end
    block.call(model)
  end
end

#each_instance_batch(klass = nil, &block) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/postgresql_cursor/cursor.rb', line 149

def each_instance_batch(klass = nil, &block)
  klass ||= @type
  each_batch do |batch|
    models = batch.map do |row|
      if ::ActiveRecord::VERSION::MAJOR < 4
        klass.send(:instantiate, row)
      else
        @column_types ||= column_types
        klass.send(:instantiate, row, @column_types)
      end
    end
    block.call(models)
  end
end

#each_row(&block) ⇒ Object



96
97
98
99
100
101
# File 'lib/postgresql_cursor/cursor.rb', line 96

def each_row(&block)
  each_tuple do |row|
    row = row.symbolize_keys if @options[:symbolize_keys]
    block.call(row)
  end
end

#each_row_batch(&block) ⇒ Object



129
130
131
132
133
134
# File 'lib/postgresql_cursor/cursor.rb', line 129

def each_row_batch(&block)
  each_batch do |batch|
    batch.map!(&:symbolize_keys) if @options[:symbolize_keys]
    block.call(batch)
  end
end

#each_tuple(&block) ⇒ Object

:nodoc:



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/postgresql_cursor/cursor.rb', line 185

def each_tuple(&block) # :nodoc:
  has_do_until = @options.has_key?(:until)
  has_do_while = @options.has_key?(:while)
  @count = 0
  @column_types = nil
  with_optional_transaction do
    open
    while (row = fetch)
      break if row.size == 0
      @count += 1
      rc = block.call(row)
      break if has_do_until && rc == @options[:until]
      break if has_do_while && rc != @options[:while]
    end
  rescue => e
    raise e
  ensure
    close if @block && connection.active?
  end
  @count
end

#fetch(options = {}) ⇒ Object

Public: Returns the next row from the cursor, or empty hash if end of results

Returns a row as a hash of ‘colname’=>value,…



263
264
265
266
267
268
269
# File 'lib/postgresql_cursor/cursor.rb', line 263

def fetch(options = {})
  open unless @block
  fetch_block if @block.size == 0
  row = @block.shift
  row = row.symbolize_keys if row && options[:symbolize_keys]
  row
end

#fetch_block(block_size = nil) ⇒ Object

Private: Fetches the next block of rows into @block



272
273
274
275
276
277
278
279
280
281
# File 'lib/postgresql_cursor/cursor.rb', line 272

def fetch_block(block_size = nil)
  block_size ||= @block_size ||= @options.fetch(:block_size, 1000)
  @result = @connection.execute("fetch #{block_size} from #{@cursor}")

  @block = if @iterate == :each_array
    @result.each_row.collect { |row| row }
  else
    @result.collect { |row| row }
  end
end

#iterate_batched(batched = true) ⇒ Object



69
70
71
72
# File 'lib/postgresql_cursor/cursor.rb', line 69

def iterate_batched(batched = true)
  @batched = batched
  self
end

#iterate_type(type = nil) ⇒ Object

Specify the type to instantiate, or reset to return a Hash.

Explicitly check for type class to prevent calling equality operator on active record relation, which will load it.



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/postgresql_cursor/cursor.rb', line 57

def iterate_type(type = nil)
  if type.nil? || (type.instance_of?(Class) && type == Hash)
    @iterate = :each_row
  elsif type.instance_of?(Class) && type == Array
    @iterate = :each_array
  else
    @iterate = :each_instance
    @type = type
  end
  self
end

#openObject

Public: Opens (actually, “declares”) the cursor. Call this before fetching



252
253
254
255
256
257
258
# File 'lib/postgresql_cursor/cursor.rb', line 252

def open
  set_cursor_tuple_fraction
  @cursor = @options[:cursor_name] || ("cursor_" + SecureRandom.uuid.delete("-"))
  hold = @options[:with_hold] ? "with hold " : ""
  @result = @connection.execute("declare #{@cursor} no scroll cursor #{hold}for #{@sql}")
  @block = []
end

#pluck(*cols) ⇒ Object

Returns an array of columns plucked from the result rows. Experimental function, as this could still use too much memory and negate the purpose of this libarary. Should this return a lazy enumerator instead?



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/postgresql_cursor/cursor.rb', line 168

def pluck(*cols)
  options = cols.last.is_a?(Hash) ? cols.pop : {}
  @options.merge!(options)
  @options[:symbolize_keys] = true
  iterate_type(options[:class]) if options[:class]
  cols = cols.map { |c| c.to_sym }
  result = []

  each do |row|
    row = row.symbolize_keys if row.is_a?(Hash)
    result << cols.map { |c| row[c] }
  end

  result.flatten! if cols.size == 1
  result
end

#set_cursor_tuple_fraction(frac = 1.0) ⇒ Object

Private: Sets the PostgreSQL cursor_tuple_fraction value = 1.0 to assume all rows will be fetched This is a value between 0.1 and 1.0 (PostgreSQL defaults to 0.1, this library defaults to 1.0) used to determine the expected fraction (percent) of result rows returned the the caller. This value determines the access path by the query planner.



301
302
303
304
305
306
307
# File 'lib/postgresql_cursor/cursor.rb', line 301

def set_cursor_tuple_fraction(frac = 1.0)
  @cursor_tuple_fraction ||= @options.fetch(:fraction, 1.0)
  return @cursor_tuple_fraction if frac == @cursor_tuple_fraction
  @cursor_tuple_fraction = frac
  @result = @connection.execute("set cursor_tuple_fraction to  #{frac}")
  frac
end

#sizeObject

ActiveRecord call #size when rendering a collection Define it and return some dummy value



76
77
78
# File 'lib/postgresql_cursor/cursor.rb', line 76

def size
  -1
end

#with_optional_transactionObject

Private: Open transaction unless with_hold option, specified



289
290
291
292
293
294
295
# File 'lib/postgresql_cursor/cursor.rb', line 289

def with_optional_transaction
  if @options[:with_hold]
    yield
  else
    @connection.transaction { yield }
  end
end