Class: EventMachine::Protocols::PostgresConnection

Inherits:
Connection
  • Object
show all
Includes:
PostgresPR
Defined in:
lib/postgres_connection.rb

Instance Method Summary collapse

Constructor Details

#initializePostgresConnection

Returns a new instance of PostgresConnection.



33
34
35
36
37
# File 'lib/postgres_connection.rb', line 33

def initialize
  @data = ""
  @params = {}
  @connected = false
end

Instance Method Details

#closeObject



63
64
65
# File 'lib/postgres_connection.rb', line 63

def close
  close_connection
end

#closed?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/postgres_connection.rb', line 67

def closed?
  !@connected
end

#connect(db, user, psw = nil) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/postgres_connection.rb', line 82

def connect(db, user, psw=nil)
  d = EM::DefaultDeferrable.new
  d.timeout 15

  if @pending_query || @pending_conn
    d.fail "Operation already in progress"
  else
    @pending_conn = d
    prms = {"user"=>user, "database"=>db}
    @user = user
    if psw
      @password = psw
      #prms["password"] = psw
    end
    send_data PostgresPR::StartupMessage.new( 3 << 16, prms ).dump
  end

  d
end

#dispatch_conn_message(msg) ⇒ Object

Cloned and modified from the postgres-pr.



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/postgres_connection.rb', line 141

def dispatch_conn_message(msg)
  case msg
  when AuthentificationClearTextPassword
    raise ArgumentError, "no password specified" if @password.nil?
    send_data PasswordMessage.new(@password).dump

  when AuthentificationCryptPassword
    raise ArgumentError, "no password specified" if @password.nil?
    send_data PasswordMessage.new(@password.crypt(msg.salt)).dump

  when AuthentificationMD5Password
    raise ArgumentError, "no password specified" if @password.nil?
    require 'digest/md5'

    m = Digest::MD5.hexdigest(@password + @user)
    m = Digest::MD5.hexdigest(m + msg.salt)
    m = 'md5' + m
    send_data PasswordMessage.new(m).dump

  when AuthentificationKerberosV4, AuthentificationKerberosV5, AuthentificationSCMCredential
    raise "unsupported authentification"

  when AuthentificationOk
  when ErrorResponse
    raise msg.field_values.join("\t")
  when NoticeResponse
    @notice_processor.call(msg) if @notice_processor
  when ParameterStatus
    @params[msg.key] = msg.value
  when BackendKeyData
    # TODO
    #p msg
  when ReadyForQuery
    # TODO: use transaction status
    pc,@pending_conn = @pending_conn,nil
    pc.succeed true
  else
    raise "unhandled message type"
  end
end

#dispatch_query_message(msg) ⇒ Object

Cloned and modified from the postgres-pr.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/postgres_connection.rb', line 183

def dispatch_query_message(msg)
  case msg
  when DataRow
    @r.rows << msg.columns
  when CommandComplete
    @r.cmd_tag = msg.cmd_tag
  when ReadyForQuery
    pq,@pending_query = @pending_query,nil
    pq.succeed @e.size == 0, @r, @e
  when RowDescription
    @r.fields = msg.fields
  when CopyInResponse
  when CopyOutResponse
  when EmptyQueryResponse
  when ErrorResponse
    @e << msg.field_values[2]
  when NoticeResponse
    @notice_processor.call(msg) if @notice_processor
  when ParameterStatus
  else
    # TODO
    puts "Unknown Postgres message: #{msg}"
  end
end

#exec(sql) ⇒ Object

Fibered impl for synchronous execution of SQL within EM

Raises:

  • (RuntimeError)


40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/postgres_connection.rb', line 40

def exec(sql)
  fiber = Fiber.current
#        p [fiber.object_id, self.object_id, sql]
  yielding = true
  (status, result, errors) = nil
  d = query(sql)
  d.callback do |s, r, e|
    (status, result, errors) = s, r, e
    fiber.resume
  end
  d.errback do |msg|
    errors = msg
    status = false
    # errback is called from the same fiber
    yielding = false
  end
  
  Fiber.yield if yielding
#        p [fiber.object_id, self.object_id, result]
  return PGresult.new(result) if status
  raise RuntimeError, (errors || result).inspect
end

#post_initObject



71
72
73
# File 'lib/postgres_connection.rb', line 71

def post_init
  @connected = true
end

#query(sql) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/postgres_connection.rb', line 102

def query(sql)
  d = EM::DefaultDeferrable.new
  d.timeout 15

  if !@connected
    d.fail "Not connected"
  elsif @pending_query || @pending_conn
    d.fail "Operation already in progress"
  else
    @r = PostgresPR::Connection::Result.new
    @e = []
    @pending_query = d
    send_data PostgresPR::Query.dump(sql)
  end

  d
end

#receive_data(data) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/postgres_connection.rb', line 120

def receive_data(data)
  @data << data
  while @data.length >= 5
    pktlen = @data[1...5].unpack("N").first
    if @data.length >= (1 + pktlen)
      pkt = @data.slice!(0...(1+pktlen))
      m = StringIO.open( pkt, "r" ) {|io| PostgresPR::Message.read( io ) }
      if @pending_conn
        dispatch_conn_message m
      elsif @pending_query
        dispatch_query_message m
      else
        raise "Unexpected message from database"
      end
    else
      break # very important, break out of the while
    end
  end
end

#unbindObject



75
76
77
78
79
80
# File 'lib/postgres_connection.rb', line 75

def unbind
  @connected = false
  if o = (@pending_query || @pending_conn)
    o.succeed false, "lost connection"
  end
end