Class: PostgreSQLCursor::Cursor

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/postgresql_cursor/cursor.rb

Constant Summary collapse

@@cursor_seq =
0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sql, options = {}) ⇒ Cursor

Public: Start a new PostgreSQL cursor query sql - The SQL statement with interpolated values options - hash of processing controls

while: value    - Exits loop when block does not return this value.
until: value    - Exits loop when block returns this value.
fraction: 0.1..1.0    - The cursor_tuple_fraction (default 1.0)
block_size: 1..n      - The number of rows to fetch per db block fetch
                        Defaults to 1000

Examples

PostgreSQLCursor::Cursor.new("select ....")

Returns the cursor object when called with new.



39
40
41
42
43
44
45
# File 'lib/postgresql_cursor/cursor.rb', line 39

def initialize(sql, options={})
  @sql        = sql
  @options    = options
  @connection = @options.fetch(:connection) { ::ActiveRecord::Base.connection }
  @count      = 0
  @iterate    = options[:instances] ? :each_instance : :each_row
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



22
23
24
# File 'lib/postgresql_cursor/cursor.rb', line 22

def connection
  @connection
end

#countObject (readonly)

Returns the value of attribute count.



22
23
24
# File 'lib/postgresql_cursor/cursor.rb', line 22

def count
  @count
end

#optionsObject (readonly)

Returns the value of attribute options.



22
23
24
# File 'lib/postgresql_cursor/cursor.rb', line 22

def options
  @options
end

#resultObject (readonly)

Returns the value of attribute result.



22
23
24
# File 'lib/postgresql_cursor/cursor.rb', line 22

def result
  @result
end

#sqlObject (readonly)

Returns the value of attribute sql.



22
23
24
# File 'lib/postgresql_cursor/cursor.rb', line 22

def sql
  @sql
end

Instance Method Details

#cast_types(row) ⇒ Object



137
138
139
# File 'lib/postgresql_cursor/cursor.rb', line 137

def cast_types(row)
  row
end

#closeObject

Public: Closes the cursor



183
184
185
# File 'lib/postgresql_cursor/cursor.rb', line 183

def close
  @connection.execute("close cursor_#{@cursor}")
end

#column_typesObject



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/postgresql_cursor/cursor.rb', line 141

def column_types
  return nil if ::ActiveRecord::VERSION::MAJOR < 4
  return @column_types if @column_types

  types = {}
  fields = @result.fields
  fields.each_with_index do |fname, i|
    ftype = @result.ftype i
    fmod  = @result.fmod i
    types[fname] = @connection.get_type_map.fetch(ftype, fmod) { |oid, mod|
      warn "unknown OID: #{fname}(#{oid}) (#{sql})"
      OID::Identity.new
    }
  end

  @column_types = types
end

#each(&block) ⇒ Object

Public: Yields each row of the result set to the passed block

Yields the row to the block. The row is a hash with symbolized keys.

{colname: value, ....}

Returns the count of rows processed



64
65
66
67
68
69
70
# File 'lib/postgresql_cursor/cursor.rb', line 64

def each(&block)
  if @iterate == :each_row
    self.each_row(&block)
  else
    self.each_instance(@type, &block)
  end
end

#each_instance(klass = nil, &block) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/postgresql_cursor/cursor.rb', line 79

def each_instance(klass=nil, &block)
  klass ||= @type
  self.each_tuple do |row|
    if ::ActiveRecord::VERSION::MAJOR < 4
      model = klass.send(:instantiate,row)
    else
      @column_types ||= column_types
      model = klass.send(:instantiate, row, @column_types)
    end
    block.call(model)
  end
end

#each_row(&block) ⇒ Object



72
73
74
75
76
77
# File 'lib/postgresql_cursor/cursor.rb', line 72

def each_row(&block)
  self.each_tuple do |row|
    row = row.symbolize_keys if @options[:symbolize_keys]
    block.call(row)
  end
end

#each_tuple(&block) ⇒ Object

:nodoc:



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/postgresql_cursor/cursor.rb', line 113

def each_tuple(&block) #:nodoc:
  has_do_until  = @options.has_key?(:until)
  has_do_while  = @options.has_key?(:while)
  @count        = 0
  @column_types = nil
  @connection.transaction do
    begin
      open
      while (row = fetch) do
        break if row.size==0
        @count += 1
        rc = block.call(row)
        break if has_do_until && rc == @options[:until]
        break if has_do_while && rc != @options[:while]
      end
    rescue Exception => e
      raise e
    ensure
      close
    end
  end
  @count
end

#fetchObject

Public: Returns the next row from the cursor, or empty hash if end of results

Returns a row as a hash of ‘colname’=>value,…



170
171
172
173
# File 'lib/postgresql_cursor/cursor.rb', line 170

def fetch
  fetch_block if @block.size==0
  @block.shift
end

#fetch_block(block_size = nil) ⇒ Object

Private: Fetches the next block of rows into @block



176
177
178
179
180
# File 'lib/postgresql_cursor/cursor.rb', line 176

def fetch_block(block_size=nil)
  block_size ||= @block_size ||= @options.fetch(:block_size) { 1000 }
  @result = @connection.execute("fetch #{block_size} from cursor_#{@cursor}")
  @block = @result.collect {|row| row } # Make our own
end

#iterate_type(type = nil) ⇒ Object

Specify the type to instantiate, or reset to return a Hash



48
49
50
51
52
53
54
55
56
# File 'lib/postgresql_cursor/cursor.rb', line 48

def iterate_type(type=nil)
  if type.nil? || type == Hash
    @iterate = :each_row
  else
    @iterate = :each_instance
    @type    = type
  end
  self
end

#openObject

Public: Opens (actually, “declares”) the cursor. Call this before fetching



160
161
162
163
164
165
# File 'lib/postgresql_cursor/cursor.rb', line 160

def open
  set_cursor_tuple_fraction
  @cursor = @@cursor_seq += 1
  @result = @connection.execute("declare cursor_#{@cursor} cursor for #{@sql}")
  @block = []
end

#pluck(*cols) ⇒ Object

Returns an array of columns plucked from the result rows. Experimental function, as this could still use too much memory and negate the purpose of this libarary. Should this return a lazy enumerator instead?



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/postgresql_cursor/cursor.rb', line 96

def pluck(*cols)
  options = cols.last.is_a?(Hash) ? cols.pop : {}
  @options.merge!(options)
  @options[:symbolize_keys] = true
  self.iterate_type(options[:class]) if options[:class]
  cols    = cols.map {|c| c.to_sym }
  result  = []

  self.each() do |row|
    row = row.symbolize_keys if row.is_a?(Hash)
    result << cols.map { |c| row[c] }
  end

  result.flatten! if cols.size == 1
  result
end

#set_cursor_tuple_fraction(frac = 1.0) ⇒ Object

Private: Sets the PostgreSQL cursor_tuple_fraction value = 1.0 to assume all rows will be fetched This is a value between 0.1 and 1.0 (PostgreSQL defaults to 0.1, this library defaults to 1.0) used to determine the expected fraction (percent) of result rows returned the the caller. This value determines the access path by the query planner.



191
192
193
194
195
196
197
# File 'lib/postgresql_cursor/cursor.rb', line 191

def set_cursor_tuple_fraction(frac=1.0)
  @cursor_tuple_fraction ||= @options.fetch(:fraction) { 1.0 }
  return @cursor_tuple_fraction if frac == @cursor_tuple_fraction
  @cursor_tuple_fraction = frac
  @result = @connection.execute("set cursor_tuple_fraction to  #{frac}")
  frac
end