Class: EM::PG

Inherits:
Object
  • Object
show all
Includes:
Deferrable
Defined in:
lib/em/pg.rb

Defined Under Namespace

Classes: BadConnectionStatusError, BadPollStatusError, BadStateError, ConnectionRefusedError, DisconnectError, Error, PGError, Query, UnexpectedStateError, Watcher

Constant Summary collapse

VERSION =
'0.1.0'

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ PG

Returns a new instance of PG.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/em/pg.rb', line 65

def initialize(opts)
  opts = opts.dup
  @logger = opts.delete(:logger) || EM::Postgres.logger
  @on_disconnect = opts.delete(:on_disconnect)
  @opts = opts
  @state = :connecting

  @pg = ::PG::Connection.connect_start(@opts)
  @queue = []

  @watcher = EM.watch(@pg.socket, Watcher, self)
  @watcher.notify_readable = true
  check_connect
end

Class Attribute Details

.loggerObject

Returns the value of attribute logger.



60
61
62
# File 'lib/em/pg.rb', line 60

def logger
  @logger
end

Instance Attribute Details

#connObject

Returns the value of attribute conn.



64
65
66
# File 'lib/em/pg.rb', line 64

def conn
  @conn
end

#loggerObject

Returns the value of attribute logger.



64
65
66
# File 'lib/em/pg.rb', line 64

def logger
  @logger
end

#on_disconnectObject

Returns the value of attribute on_disconnect.



64
65
66
# File 'lib/em/pg.rb', line 64

def on_disconnect
  @on_disconnect
end

#optsObject

Returns the value of attribute opts.



64
65
66
# File 'lib/em/pg.rb', line 64

def opts
  @opts
end

#pgObject

Returns the value of attribute pg.



64
65
66
# File 'lib/em/pg.rb', line 64

def pg
  @pg
end

#stateObject

Returns the value of attribute state.



64
65
66
# File 'lib/em/pg.rb', line 64

def state
  @state
end

#watcherObject

Returns the value of attribute watcher.



64
65
66
# File 'lib/em/pg.rb', line 64

def watcher
  @watcher
end

Instance Method Details

#add_to_queue(query) ⇒ Object



140
141
142
# File 'lib/em/pg.rb', line 140

def add_to_queue(query)
  @queue << query
end

#check_connectObject



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/em/pg.rb', line 99

def check_connect
  status = @pg.connect_poll
  case status
  when ::PG::PGRES_POLLING_OK
    if pg.status == ::PG::CONNECTION_OK
      connected
    elsif pg.status == ::PG::CONNECTION_BAD
      connection_refused
    else
      raise BadConnectionStatusError.new
    end
  when ::PG::PGRES_POLLING_READING
  when ::PG::PGRES_POLLING_WRITING
    @watcher.notify_writable = true
  when ::PG::PGRES_POLLING_FAILED
    @watcher.detach
    connection_refused
  else
    raise BadPollStatsError.new
  end
end

#closeObject



202
203
204
205
206
207
# File 'lib/em/pg.rb', line 202

def close
  @state = :closed
  @watcher.detach
  @pg.finish
  fail_queries :closed
end

#connectedObject



182
183
184
185
# File 'lib/em/pg.rb', line 182

def connected
  @state = :connected
  succeed :connected
end

#connection_refusedObject



187
188
189
190
191
# File 'lib/em/pg.rb', line 187

def connection_refused
  @state = :connection_refused
  logger.error [:connection_refused, @pg.error_message]
  fail ConnectionRefusedError.new(message: @pg.error_message)
end

#consume_result(&clb) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
# File 'lib/em/pg.rb', line 158

def consume_result(&clb)
  begin
    @pg.consume_input # can raise exceptins
    if @pg.is_busy
    else
      clb.call @pg.get_last_result # can raise exceptions
    end
  rescue ::PG::Error => e
    clb.call PGError.new(original: e)
  end
end

#fail_queries(exc) ⇒ Object



209
210
211
212
# File 'lib/em/pg.rb', line 209

def fail_queries(exc)
  @current_query.fail exc if @current_query
  @queue.each { |q| q.fail exc }
end

#handleObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/em/pg.rb', line 80

def handle
  case @state
  when :connecting
    check_connect
  when :waiting
    consume_result do |res|
      result_for_query res
    end
  else # try check result, may be it close-message
    consume_result do |res|
      if res.is_a? Exception
        unbind res
      else
        error "Result in unexpected state #{@state}: #{res.inspect}"
      end
    end
  end
end

#make_query(m, *args) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/em/pg.rb', line 127

def make_query(m, *args)
  q = Query.new m, args
  case @state
  when :waiting
    add_to_queue q
  when :connected
    run_query! q
  else
    q.fail BadStateError.new(state: @state)
  end
  q
end

#result_for_query(res) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
# File 'lib/em/pg.rb', line 170

def result_for_query(res)
  @state = :connected
  q = @current_query
  @current_query = nil
  if res.is_a? Exception
    q.fail res
  else
    q.succeed res
  end
  try_next_from_queue
end

#run_query!(q) ⇒ Object



144
145
146
147
148
149
# File 'lib/em/pg.rb', line 144

def run_query!(q)
  @current_query = q
  @state = :waiting
  debug(["EM::PG", q.method, q.args])
  @pg.send(q.method, *q.args)
end

#try_next_from_queueObject



151
152
153
154
155
156
# File 'lib/em/pg.rb', line 151

def try_next_from_queue
  q = @queue.shift
  if q
    run_query! q
  end
end

#unbind(reason = nil) ⇒ Object



193
194
195
196
197
198
199
200
# File 'lib/em/pg.rb', line 193

def unbind(reason = nil)
  return if @state == :disconnected
  logger.error [:disconnected, reason]
  @state = :disconnected
  @watcher.detach
  @on_disconnect.call if @on_disconnect
  fail_queries DisconnectError.new
end