Class: PostgreSQLCursor::Cursor

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/postgresql_cursor/cursor.rb

Constant Summary collapse

@@cursor_seq =
0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sql, options = {}) ⇒ Cursor

Public: Start a new PostgreSQL cursor query sql - The SQL statement with interpolated values options - hash of processing controls

while: value    - Exits loop when block does not return this value.
until: value    - Exits loop when block returns this value.
fraction: 0.1..1.0    - The cursor_tuple_fraction (default 1.0)
block_size: 1..n      - The number of rows to fetch per db block fetch
                        Defaults to 1000
with_hold       - Allows the query to remain open across commit points.

Examples

PostgreSQLCursor::Cursor.new("select ....")

Returns the cursor object when called with new.



41
42
43
44
45
46
47
# File 'lib/postgresql_cursor/cursor.rb', line 41

def initialize(sql, options={})
  @sql        = sql
  @options    = options
  @connection = @options.fetch(:connection) { ::ActiveRecord::Base.connection }
  @count      = 0
  @iterate    = options[:instances] ? :each_instance : :each_row
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



23
24
25
# File 'lib/postgresql_cursor/cursor.rb', line 23

def connection
  @connection
end

#countObject (readonly)

Returns the value of attribute count.



23
24
25
# File 'lib/postgresql_cursor/cursor.rb', line 23

def count
  @count
end

#optionsObject (readonly)

Returns the value of attribute options.



23
24
25
# File 'lib/postgresql_cursor/cursor.rb', line 23

def options
  @options
end

#resultObject (readonly)

Returns the value of attribute result.



23
24
25
# File 'lib/postgresql_cursor/cursor.rb', line 23

def result
  @result
end

#sqlObject (readonly)

Returns the value of attribute sql.



23
24
25
# File 'lib/postgresql_cursor/cursor.rb', line 23

def sql
  @sql
end

Instance Method Details

#cast_types(row) ⇒ Object



139
140
141
# File 'lib/postgresql_cursor/cursor.rb', line 139

def cast_types(row)
  row
end

#closeObject

Public: Closes the cursor



189
190
191
# File 'lib/postgresql_cursor/cursor.rb', line 189

def close
  @connection.execute("close cursor_#{@cursor}")
end

#column_typesObject



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/postgresql_cursor/cursor.rb', line 143

def column_types
  return nil if ::ActiveRecord::VERSION::MAJOR < 4
  return @column_types if @column_types

  types = {}
  fields = @result.fields
  fields.each_with_index do |fname, i|
    ftype = @result.ftype i
    fmod  = @result.fmod i
    types[fname] = @connection.get_type_map.fetch(ftype, fmod) { |oid, mod|
      warn "unknown OID: #{fname}(#{oid}) (#{sql})"
      OID::Identity.new
    }
  end

  @column_types = types
end

#each(&block) ⇒ Object

Public: Yields each row of the result set to the passed block

Yields the row to the block. The row is a hash with symbolized keys.

{colname: value, ....}

Returns the count of rows processed



66
67
68
69
70
71
72
# File 'lib/postgresql_cursor/cursor.rb', line 66

def each(&block)
  if @iterate == :each_row
    self.each_row(&block)
  else
    self.each_instance(@type, &block)
  end
end

#each_instance(klass = nil, &block) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/postgresql_cursor/cursor.rb', line 81

def each_instance(klass=nil, &block)
  klass ||= @type
  self.each_tuple do |row|
    if ::ActiveRecord::VERSION::MAJOR < 4
      model = klass.send(:instantiate,row)
    else
      @column_types ||= column_types
      model = klass.send(:instantiate, row, @column_types)
    end
    block.call(model)
  end
end

#each_row(&block) ⇒ Object



74
75
76
77
78
79
# File 'lib/postgresql_cursor/cursor.rb', line 74

def each_row(&block)
  self.each_tuple do |row|
    row = row.symbolize_keys if @options[:symbolize_keys]
    block.call(row)
  end
end

#each_tuple(&block) ⇒ Object

:nodoc:



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/postgresql_cursor/cursor.rb', line 115

def each_tuple(&block) #:nodoc:
  has_do_until  = @options.has_key?(:until)
  has_do_while  = @options.has_key?(:while)
  @count        = 0
  @column_types = nil
  @connection.transaction do
    begin
      open
      while (row = fetch) do
        break if row.size==0
        @count += 1
        rc = block.call(row)
        break if has_do_until && rc == @options[:until]
        break if has_do_while && rc != @options[:while]
      end
    rescue Exception => e
      raise e
    ensure
      close if @block
    end
  end
  @count
end

#fetch(options = {}) ⇒ Object

Public: Returns the next row from the cursor, or empty hash if end of results

Returns a row as a hash of ‘colname’=>value,…



173
174
175
176
177
178
179
# File 'lib/postgresql_cursor/cursor.rb', line 173

def fetch(options={})
  open unless @block
  fetch_block if @block.size==0
  row = @block.shift
  row = row.symbolize_keys if row && options[:symbolize_keys]
  row
end

#fetch_block(block_size = nil) ⇒ Object

Private: Fetches the next block of rows into @block



182
183
184
185
186
# File 'lib/postgresql_cursor/cursor.rb', line 182

def fetch_block(block_size=nil)
  block_size ||= @block_size ||= @options.fetch(:block_size) { 1000 }
  @result = @connection.execute("fetch #{block_size} from cursor_#{@cursor}")
  @block = @result.collect {|row| row } # Make our own
end

#iterate_type(type = nil) ⇒ Object

Specify the type to instantiate, or reset to return a Hash



50
51
52
53
54
55
56
57
58
# File 'lib/postgresql_cursor/cursor.rb', line 50

def iterate_type(type=nil)
  if type.nil? || type == Hash
    @iterate = :each_row
  else
    @iterate = :each_instance
    @type    = type
  end
  self
end

#openObject

Public: Opens (actually, “declares”) the cursor. Call this before fetching



162
163
164
165
166
167
168
# File 'lib/postgresql_cursor/cursor.rb', line 162

def open
  set_cursor_tuple_fraction
  @cursor = @@cursor_seq += 1
  hold = @options[:with_hold] ? 'with hold ' : ''
  @result = @connection.execute("declare cursor_#{@cursor} cursor #{hold}for #{@sql}")
  @block = []
end

#pluck(*cols) ⇒ Object

Returns an array of columns plucked from the result rows. Experimental function, as this could still use too much memory and negate the purpose of this libarary. Should this return a lazy enumerator instead?



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/postgresql_cursor/cursor.rb', line 98

def pluck(*cols)
  options = cols.last.is_a?(Hash) ? cols.pop : {}
  @options.merge!(options)
  @options[:symbolize_keys] = true
  self.iterate_type(options[:class]) if options[:class]
  cols    = cols.map {|c| c.to_sym }
  result  = []

  self.each() do |row|
    row = row.symbolize_keys if row.is_a?(Hash)
    result << cols.map { |c| row[c] }
  end

  result.flatten! if cols.size == 1
  result
end

#set_cursor_tuple_fraction(frac = 1.0) ⇒ Object

Private: Sets the PostgreSQL cursor_tuple_fraction value = 1.0 to assume all rows will be fetched This is a value between 0.1 and 1.0 (PostgreSQL defaults to 0.1, this library defaults to 1.0) used to determine the expected fraction (percent) of result rows returned the the caller. This value determines the access path by the query planner.



197
198
199
200
201
202
203
# File 'lib/postgresql_cursor/cursor.rb', line 197

def set_cursor_tuple_fraction(frac=1.0)
  @cursor_tuple_fraction ||= @options.fetch(:fraction) { 1.0 }
  return @cursor_tuple_fraction if frac == @cursor_tuple_fraction
  @cursor_tuple_fraction = frac
  @result = @connection.execute("set cursor_tuple_fraction to  #{frac}")
  frac
end