Class: Theman::Agency

Inherits:
Object
  • Object
show all
Defined in:
lib/theman/agency.rb,
lib/theman/agency/table.rb,
lib/theman/agency/columns.rb

Defined Under Namespace

Classes: Columns, Table

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn, stream, options = {}, &block) ⇒ Agency

create a new agent object - if a block is passed create! is called

Parameters

  • conn - A database connection from the PGconn class or ActiveRecord::Base.connection.raw_connection which is the same class.

  • stream - path to the data file.

  • options - Additional options are :temporary, :on_commit and :headers

Examples

# Update all customers with the given attributes
conn  = PGconn.open(:dbname => 'test')
agent = Theman::Agency.new(conn, 'sample.csv')
agent.create!
res = conn.exec("SELECT count(*) FROM #{agent.table_name}")
res.getvalue(0,0)


21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/theman/agency.rb', line 21

def initialize(conn, stream, options = {}, &block)
  @stream       = stream
  @connection   = conn
  @options      = options

  @table_name         = sprintf "agent%010d", rand(100000000)
  @columns            = Columns.new(conn)
  @stream_columns_set = false

  if block_given?
    yield self
    create!
  end
end

Instance Attribute Details

#columnsObject (readonly)

Returns the value of attribute columns.



3
4
5
# File 'lib/theman/agency.rb', line 3

def columns
  @columns
end

#connectionObject (readonly)

Returns the value of attribute connection.



3
4
5
# File 'lib/theman/agency.rb', line 3

def connection
  @connection
end

#table_nameObject (readonly)

Returns the value of attribute table_name.



3
4
5
# File 'lib/theman/agency.rb', line 3

def table_name
  @table_name
end

Instance Method Details

#add_primary_key!Object

adds a serial column called id and sets as primary key if your data allready has a column called id the column will be called agents_pkey



130
131
132
133
# File 'lib/theman/agency.rb', line 130

def add_primary_key!
  name = @columns.include?(:id) ? "agents_pkey" : "id"
  connection.exec "ALTER TABLE #{table_name} ADD COLUMN #{name} serial PRIMARY KEY;"
end

#analyze!Object

analyzes the table for efficent query contstruction on tables larger than ~1000 tuples



136
137
138
# File 'lib/theman/agency.rb', line 136

def analyze!
  connection.exec "ANALYZE #{table_name};"
end

#create!Object

Postgress COPY command using STDIN

  • reads chunks of 8192 bytes to save memory

System command for IO subprocesses are piped to take advantage of multi cores



120
121
122
123
124
125
126
# File 'lib/theman/agency.rb', line 120

def create!
  unless @stream_columns_set || @options[:headers] == false
    create_stream_columns
  end
  connection.exec Table.new(table_name, @columns.to_sql, @options[:temporary], @options[:on_commit]).to_sql
  pipe_it
end

#create_stream_columnsObject

:nodoc



43
44
45
46
47
48
# File 'lib/theman/agency.rb', line 43

def create_stream_columns #:nodoc
  @stream_columns_set = true
  headers.split(delimiter_regexp).each do |column|
    @columns.string column
  end
end

#datestyle(arg) ⇒ Object

datestyle of date columns



67
68
69
# File 'lib/theman/agency.rb', line 67

def datestyle(arg)
  @datestyle = arg
end

#delimiter(arg) ⇒ Object

delimter used in stream - comma is the default



82
83
84
# File 'lib/theman/agency.rb', line 82

def delimiter(arg)
  @delimiter = arg
end

#delimiter_regexpObject

:nodoc



112
113
114
# File 'lib/theman/agency.rb', line 112

def delimiter_regexp #:nodoc
  @delimiter_regexp ||= Regexp.new(@delimiter.nil? ? "," : "\\#{@delimiter}")
end

#drop!Object

explicitly drop table



141
142
143
144
# File 'lib/theman/agency.rb', line 141

def drop!
  connection.exec "DROP TABLE #{table_name};"
  @table_name = nil
end

#headersObject

:nodoc



50
51
52
# File 'lib/theman/agency.rb', line 50

def headers #:nodoc
  File.open(@stream, "r"){ |infile| infile.gets }
end

#nulls(*args) ⇒ Object

values in stream to replace with NULL



72
73
74
# File 'lib/theman/agency.rb', line 72

def nulls(*args)
  @nulls = args
end

#nulls_to_sedObject

:nodoc



106
107
108
109
110
# File 'lib/theman/agency.rb', line 106

def nulls_to_sed #:nodoc
  @nulls.map do |regex|
    "-e 's/#{regex.source}//g'"
  end
end

#pipe_it(l = "") ⇒ Object

:nodoc



154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/theman/agency.rb', line 154

def pipe_it(l = "") #:nodoc
  connection.exec psql_command.join("; ")
  f = IO.popen(system_command)
  begin
    while f.read(8192, l)
      connection.put_copy_data l
    end
  rescue EOFError
    f.close
  end
  connection.put_copy_end
end

#psql_command(psql = []) ⇒ Object

:nodoc



94
95
96
97
98
# File 'lib/theman/agency.rb', line 94

def psql_command(psql = []) #:nodoc
  psql << "SET DATESTYLE TO #{@datestyle}" unless @datestyle.nil?
  psql << psql_copy.join(" ")
  psql
end

#psql_copy(psql = []) ⇒ Object

:nodoc



86
87
88
89
90
91
92
# File 'lib/theman/agency.rb', line 86

def psql_copy(psql = []) #:nodoc
  psql << "COPY #{table_name} FROM STDIN WITH"
  psql << "DELIMITER '#{@delimiter}'" unless @delimiter.nil?
  psql << "CSV"
  psql << "HEADER" unless @options[:headers] == false
  psql
end

#sed_command(sed = []) ⇒ Object

:nodoc



100
101
102
103
104
# File 'lib/theman/agency.rb', line 100

def sed_command(sed = []) #:nodoc
  sed << nulls_to_sed unless @nulls.nil?
  sed << @seds unless @seds.nil?
  sed
end

#seds(*args) ⇒ Object

custom seds to parse stream with



77
78
79
# File 'lib/theman/agency.rb', line 77

def seds(*args)
  @seds = args
end

#stream(arg) ⇒ Object

the location of the data to be sent to Postgres via STDIN (requires a header row)



62
63
64
# File 'lib/theman/agency.rb', line 62

def stream(arg)
  @stream = arg
end

#system_commandObject

:nodoc



146
147
148
149
150
151
152
# File 'lib/theman/agency.rb', line 146

def system_command #:nodoc
  unless sed_command.empty?
    "cat #{@stream} | sed #{sed_command.join(" | sed ")}" 
  else
    "cat #{@stream}"
  end
end

#table {|@columns| ... } ⇒ Object

create default columns from stream and replace selected columns with custom data types from block

Yields:



56
57
58
59
# File 'lib/theman/agency.rb', line 56

def table(&block)
  create_stream_columns unless @options[:headers] == false
  yield @columns
end

#transaction(&block) ⇒ Object

create a transaction block for use with :on_commit => :drop



37
38
39
40
41
# File 'lib/theman/agency.rb', line 37

def transaction(&block)
  connection.exec "BEGIN;"
  yield
  connection.exec "COMMIT;"
end